diff --git a/contrib/parallel_dummy/Makefile b/contrib/parallel_dummy/Makefile new file mode 100644 index 0000000..de00f50 --- /dev/null +++ b/contrib/parallel_dummy/Makefile @@ -0,0 +1,19 @@ +MODULE_big = parallel_dummy +OBJS = parallel_dummy.o $(WIN32RES) +PGFILEDESC = "parallel_dummy - dummy use of parallel infrastructure" + +EXTENSION = parallel_dummy +DATA = parallel_dummy--1.0.sql + +REGRESS = parallel_dummy + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/parallel_dummy +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/parallel_dummy/parallel_dummy--1.0.sql b/contrib/parallel_dummy/parallel_dummy--1.0.sql new file mode 100644 index 0000000..3c0ae7d --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy--1.0.sql @@ -0,0 +1,12 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION parallel_dummy" to load this file. \quit + +CREATE FUNCTION parallel_sleep(sleep_time pg_catalog.int4, + nworkers pg_catalog.int4) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION parallel_count(rel pg_catalog.regclass, + nworkers pg_catalog.int4) + RETURNS pg_catalog.int8 STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/parallel_dummy/parallel_dummy.c b/contrib/parallel_dummy/parallel_dummy.c new file mode 100644 index 0000000..0a32ea8 --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.c @@ -0,0 +1,238 @@ +/*-------------------------------------------------------------------------- + * + * parallel_dummy.c + * Test harness code for parallel mode code. + * + * Copyright (C) 2013-2014, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/parallel_dummy/parallel_dummy.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/parallel.h" +#include "access/xact.h" +#include "fmgr.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/spin.h" +#include "utils/builtins.h" +#include "utils/snapmgr.h" +#include "utils/tqual.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(parallel_sleep); +PG_FUNCTION_INFO_V1(parallel_count); + +#define PARALLEL_DUMMY_KEY 1 + +typedef struct +{ + int32 sleep_time; +} ParallelSleepInfo; + +typedef struct +{ + int32 relid; + slock_t mutex; + BlockNumber lastblock; + BlockNumber currentblock; + int64 ntuples; +} ParallelCountInfo; + +void _PG_init(void); +void sleep_worker_main(dsm_segment *seg, shm_toc *toc); +void count_worker_main(dsm_segment *seg, shm_toc *toc); + +static void count_helper(Relation rel, ParallelCountInfo *info); + +Datum +parallel_sleep(PG_FUNCTION_ARGS) +{ + int32 sleep_time = PG_GETARG_INT32(0); + int32 nworkers = PG_GETARG_INT32(1); + bool already_in_parallel_mode = IsInParallelMode(); + ParallelContext *pcxt; + ParallelSleepInfo *info; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContextForExtension("parallel_dummy", + "sleep_worker_main", + nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelSleepInfo)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + InitializeParallelDSM(pcxt); + info = shm_toc_allocate(pcxt->toc, sizeof(ParallelSleepInfo)); + info->sleep_time = sleep_time; + shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info); + LaunchParallelWorkers(pcxt); + + /* here's where we do the "real work" ... */ + DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) sleep_time)); + + WaitForParallelWorkersToFinish(pcxt); + DestroyParallelContext(pcxt); + + if (!already_in_parallel_mode) + ExitParallelMode(); + + PG_RETURN_VOID(); +} + +Datum +parallel_count(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + int32 nworkers = PG_GETARG_INT32(1); + bool already_in_parallel_mode = IsInParallelMode(); + ParallelContext *pcxt; + ParallelCountInfo *info; + Relation rel; + int64 result; + + if (nworkers < 0) + ereport(ERROR, + (errmsg("number of parallel workers must be non-negative"))); + + rel = relation_open(relid, AccessShareLock); + + if (!already_in_parallel_mode) + EnterParallelMode(); + + pcxt = CreateParallelContextForExtension("parallel_dummy", + "count_worker_main", + nworkers); + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(ParallelCountInfo)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + InitializeParallelDSM(pcxt); + info = shm_toc_allocate(pcxt->toc, sizeof(ParallelCountInfo)); + info->relid = relid; + SpinLockInit(&info->mutex); + info->lastblock = RelationGetNumberOfBlocks(rel); + info->currentblock = 0; + info->ntuples = 0; + shm_toc_insert(pcxt->toc, PARALLEL_DUMMY_KEY, info); + LaunchParallelWorkers(pcxt); + + /* here's where we do the "real work" ... */ + count_helper(rel, info); + + WaitForParallelWorkersToFinish(pcxt); + + result = info->ntuples; + + DestroyParallelContext(pcxt); + + relation_close(rel, AccessShareLock); + + if (!already_in_parallel_mode) + ExitParallelMode(); + + PG_RETURN_INT64(result); +} + +void +sleep_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelSleepInfo *info; + + info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY); + Assert(info != NULL); + + /* here's where we do the "real work" ... */ + DirectFunctionCall1(pg_sleep, Float8GetDatum((float8) info->sleep_time)); +} + +void +count_worker_main(dsm_segment *seg, shm_toc *toc) +{ + ParallelCountInfo *info; + Relation rel; + + info = shm_toc_lookup(toc, PARALLEL_DUMMY_KEY); + Assert(info != NULL); + + rel = relation_open(info->relid, AccessShareLock); + count_helper(rel, info); + relation_close(rel, AccessShareLock); +} + +static void +count_helper(Relation rel, ParallelCountInfo *info) +{ + int64 ntuples = 0; + int64 mytuples = 0; + Oid relid = info->relid; + Snapshot snapshot = GetActiveSnapshot(); + + for (;;) + { + BlockNumber blkno; + Buffer buffer; + Page page; + int lines; + OffsetNumber lineoff; + ItemId lpp; + bool all_visible; + bool done = false; + + CHECK_FOR_INTERRUPTS(); + + SpinLockAcquire(&info->mutex); + if (info->currentblock >= info->lastblock) + done = true; + else + blkno = info->currentblock++; + info->ntuples += ntuples; + SpinLockRelease(&info->mutex); + + mytuples += ntuples; + if (done) + break; + + buffer = ReadBuffer(rel, blkno); + LockBuffer(buffer, BUFFER_LOCK_SHARE); + page = BufferGetPage(buffer); + lines = PageGetMaxOffsetNumber(page); + ntuples = 0; + + all_visible = PageIsAllVisible(page) && !snapshot->takenDuringRecovery; + + for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(page, lineoff); + lineoff <= lines; + lineoff++, lpp++) + { + HeapTupleData loctup; + + if (!ItemIdIsNormal(lpp)) + continue; + if (all_visible) + { + ++ntuples; + continue; + } + + loctup.t_tableOid = relid; + loctup.t_data = (HeapTupleHeader) PageGetItem(page, lpp); + loctup.t_len = ItemIdGetLength(lpp); + + if (HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer)) + ++ntuples; + } + + UnlockReleaseBuffer(buffer); + } + + elog(NOTICE, "PID %d counted " INT64_FORMAT " tuples", MyProcPid, mytuples); +} diff --git a/contrib/parallel_dummy/parallel_dummy.control b/contrib/parallel_dummy/parallel_dummy.control new file mode 100644 index 0000000..90bae3f --- /dev/null +++ b/contrib/parallel_dummy/parallel_dummy.control @@ -0,0 +1,4 @@ +comment = 'Dummy parallel code' +default_version = '1.0' +module_pathname = '$libdir/parallel_dummy' +relocatable = true diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 61216e5..8e3ccf4 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -546,6 +546,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) switch (event) { + case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_COMMIT: /* Commit all remote transactions during pre-commit */ do_sql_command(entry->conn, "COMMIT TRANSACTION"); @@ -588,11 +589,13 @@ pgfdw_xact_callback(XactEvent event, void *arg) (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot prepare a transaction that modified remote tables"))); break; + case XACT_EVENT_PARALLEL_COMMIT: case XACT_EVENT_COMMIT: case XACT_EVENT_PREPARE: /* Pre-commit should have closed the open transaction */ elog(ERROR, "missed cleaning up connection during pre-commit"); break; + case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_ABORT: /* Assume we might have lost track of prepared statements */ entry->have_error = true; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 24e300c..eee1ca0 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2234,6 +2234,17 @@ static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options) { + /* + * For now, parallel operations are required to be strictly read-only. + * Unlike heap_update() and heap_delete(), an insert should never create + * a combo CID, so it might be possible to relax this restrction, but + * not without more thought and testing. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot insert tuples during a parallel operation"))); + if (relation->rd_rel->relhasoids) { #ifdef NOT_USED @@ -2641,6 +2652,16 @@ heap_delete(Relation relation, ItemPointer tid, Assert(ItemPointerIsValid(tid)); + /* + * Forbid this during a parallel operation, lest it allocate a combocid. + * Other workers might need that combocid for visibility checks, and we + * have no provision for broadcasting it to them. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot delete tuples during a parallel operation"))); + block = ItemPointerGetBlockNumber(tid); buffer = ReadBuffer(relation, block); page = BufferGetPage(buffer); @@ -3079,6 +3100,16 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, Assert(ItemPointerIsValid(otid)); /* + * Forbid this during a parallel operation, lest it allocate a combocid. + * Other workers might need that combocid for visibility checks, and we + * have no provision for broadcasting it to them. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot update tuples during a parallel operation"))); + + /* * Fetch the list of attributes to be checked for HOT update. This is * wasted effort if we fail to update or have to put the new tuple on a * different page. But we must compute the list before obtaining buffer @@ -5382,6 +5413,17 @@ heap_inplace_update(Relation relation, HeapTuple tuple) uint32 oldlen; uint32 newlen; + /* + * For now, parallel operations are required to be strictly read-only. + * Unlike a regular update, this should never create a combo CID, so it + * might be possible to relax this restrction, but not without more + * thought and testing. It's not clear that it would be useful, anyway. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot update tuples during a parallel operation"))); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&(tuple->t_self))); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); page = (Page) BufferGetPage(buffer); diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 9d4d5db..94455b2 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/access/transam top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = clog.o commit_ts.o multixact.o rmgr.o slru.o subtrans.o \ +OBJS = clog.o commit_ts.o multixact.o parallel.o rmgr.o slru.o subtrans.o \ timeline.o transam.o twophase.o twophase_rmgr.o varsup.o \ xact.o xlog.o xlogarchive.o xlogfuncs.o \ xloginsert.o xlogreader.o xlogutils.o diff --git a/src/backend/access/transam/README.parallel b/src/backend/access/transam/README.parallel new file mode 100644 index 0000000..e427275 --- /dev/null +++ b/src/backend/access/transam/README.parallel @@ -0,0 +1,36 @@ +Overview +======== + +Before beginning any parallel operation, call EnterParallelMode(); after all +parallel operations are completed, call ExitParallelMode(). These functions +don't launch any workers or directly enable parallelism, but they put in place +a variety of prohibitions required to make parallelism safe. + +To actually parallelize a particular operation, use a ParallelContext. This +establishes a dynamic shared memory segment and registers dynamic background +workers which will attach to that segment. We arrange to synchronize various +pieces of state - such as, most simply, the database and user OIDs - from the +backend that is initiating parallelism to all of the background workers +launched via a ParallelContext. The basic coding pattern looks like this: + + EnterParallelMode(); /* prohibit unsafe state changes */ + + pcxt = CreateParallelContext(entrypoint, nworkers); + + /* Allow space for application-specific data here. */ + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, keys); + + InitializeParallelDSM(pcxt); /* create DSM and copy state to it */ + + /* Store the data for which we reserved space. */ + space = shm_toc_allocate(pcxt->toc, size); + shm_toc_insert(pcxt->toc, key, space); + + LaunchParallelWorkers(pcxt); + + /* do parallel stuff */ + + DestroyParallelContext(pcxt); + + ExitParallelMode(); diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c new file mode 100644 index 0000000..71374cc --- /dev/null +++ b/src/backend/access/transam/parallel.c @@ -0,0 +1,886 @@ +/*------------------------------------------------------------------------- + * + * parallel.c + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/transam/parallel.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "access/parallel.h" +#include "commands/async.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "miscadmin.h" +#include "storage/ipc.h" +#include "storage/sinval.h" +#include "storage/spin.h" +#include "utils/combocid.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/resowner.h" +#include "utils/snapmgr.h" + +/* + * We don't want to waste a lot of memory on an error queue which, most of + * the time, will process only a handful of small messages. However, it is + * desirable to make it large enough that a typical ErrorResponse can be sent + * without blocking. That way, a worker that errors out can write the whole + * message into the queue and terminate without waiting for the user backend. + */ +#define PARALLEL_ERROR_QUEUE_SIZE 16384 + +/* Magic number for parallel context TOC. */ +#define PARALLEL_MAGIC 0x50477c7c + +/* + * Magic numbers for parallel state sharing. Higher-level code should use + * smaller values, leaving these very large ones for use by this module. + */ +#define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) +#define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) +#define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0003) +#define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0004) +#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0005) +#define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) +#define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0007) +#define PARALLEL_KEY_EXTENSION_TRAMPOLINE UINT64CONST(0xFFFFFFFFFFFF0008) + +/* Fixed-size parallel state. */ +typedef struct FixedParallelState +{ + /* Fixed-size state that workers must restore. */ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; + PGPROC *parallel_master_pgproc; + pid_t parallel_master_pid; + BackendId parallel_master_backend_id; + + /* Entrypoint for parallel workers. */ + parallel_worker_main_type entrypoint; + + /* Track whether workers have attached. */ + slock_t mutex; + int workers_expected; + int workers_attached; +} FixedParallelState; + +/* + * Our parallel worker number. We initialize this to -1, meaning that we are + * not a parallel worker. In parallel workers, it will be set to a value >= 0 + * and < the number of workers before any user code is invoked; each parallel + * worker will get a different parallel worker number. + */ +int ParallelWorkerNumber = -1; + +/* Is there a parallel message pending which we need to receive? */ +bool ParallelMessagePending = false; + +/* Are we in the midst of handling parallel messages? */ +static bool HandlingParallelMessages = false; + +/* List of active parallel contexts. */ +static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); + +/* Private functions. */ +static void HandleParallelMessages(void); +static void HandleParallelMessage(ParallelContext *, int, StringInfo msg); +static void ParallelErrorContext(void *arg); +static void ParallelMain(Datum main_arg); +static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc); +static void handle_sigterm(SIGNAL_ARGS); + +/* + * Establish a new parallel context. This should be done after entering + * parallel mode, and (unless there is an error) the context should be + * destroyed before exiting the current subtransaction. + */ +ParallelContext * +CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* It is unsafe to create a parallel context if not in parallel mode. */ + Assert(IsInParallelMode()); + + /* Number of workers should be positive. */ + Assert(nworkers >= 0); + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Initialize a new ParallelContext. */ + pcxt = palloc0(sizeof(ParallelContext)); + pcxt->subid = GetCurrentSubTransactionId(); + pcxt->nworkers = nworkers; + pcxt->entrypoint = entrypoint; + shm_toc_initialize_estimator(&pcxt->estimator); + dlist_push_head(&pcxt_list, &pcxt->node); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish a new parallel context that calls a function provided by an + * extension. This works around the fact that the library might get mapped + * at a different address in each backend. + */ +ParallelContext * +CreateParallelContextForExtension(char *library_name, char *function_name, + int nworkers) +{ + MemoryContext oldcontext; + ParallelContext *pcxt; + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Create the context. */ + pcxt = CreateParallelContext(ParallelExtensionTrampoline, nworkers); + pcxt->library_name = pstrdup(library_name); + pcxt->function_name = pstrdup(function_name); + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); + + return pcxt; +} + +/* + * Establish the dynamic shared memory segment for a parallel context and + * copied state and other bookkeeping information that will need by parallel + * workers into it. + */ +void +InitializeParallelDSM(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + Size guc_len; + Size combocidlen; + Size tsnaplen; + Size asnaplen; + Size tstatelen; + Size segsize; + int i; + FixedParallelState *fps; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + char *error_queue_space; + Snapshot transaction_snapshot = GetTransactionSnapshot(); + Snapshot active_snapshot = GetActiveSnapshot(); + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Allocate space for worker information. */ + pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); + + /* + * Estimate how much space we'll need for state sharing. + * + * If you add more chunks here, you probably need more keys, too. + */ + shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState)); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, guc_len); + combocidlen = EstimateComboCIDStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, combocidlen); + tsnaplen = EstimateSnapshotSpace(transaction_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); + asnaplen = EstimateSnapshotSpace(active_snapshot); + shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); + tstatelen = EstimateTransactionStateSpace(); + shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); + shm_toc_estimate_keys(&pcxt->estimator, 6); + + /* Estimate how much space we'll need for error queues. */ + StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == + PARALLEL_ERROR_QUEUE_SIZE, + "parallel error queue size not buffer-aligned"); + shm_toc_estimate_chunk(&pcxt->estimator, + PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + shm_toc_estimate_keys(&pcxt->estimator, 1); + + /* Estimate how much we'll need for extension entrypoint information. */ + if (pcxt->library_name != NULL) + { + Assert(pcxt->entrypoint == ParallelExtensionTrampoline); + Assert(pcxt->function_name != NULL); + shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + + strlen(pcxt->function_name) + 2); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + + /* Create DSM and initialize with new table of contents. */ + segsize = shm_toc_estimate(&pcxt->estimator); + pcxt->seg = dsm_create(segsize); + pcxt->toc = shm_toc_create(PARALLEL_MAGIC, + dsm_segment_address(pcxt->seg), + segsize); + + /* Initialize fixed-size state in shared memory. */ + fps = (FixedParallelState *) + shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState)); + fps->database_id = MyDatabaseId; + fps->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context); + fps->parallel_master_pgproc = MyProc; + fps->parallel_master_pid = MyProcPid; + fps->parallel_master_backend_id = MyBackendId; + fps->entrypoint = pcxt->entrypoint; + SpinLockInit(&fps->mutex); + fps->workers_expected = pcxt->nworkers; + fps->workers_attached = 0; + shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); + + /* Serialize GUC state to dynamic shared memory. */ + gucspace = shm_toc_allocate(pcxt->toc, guc_len); + SerializeGUCState(guc_len, gucspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace); + + /* Serialize combo CID state to dynamic shared memory. */ + combocidspace = shm_toc_allocate(pcxt->toc, combocidlen); + SerializeComboCIDState(combocidlen, combocidspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); + + /* Serialize transaction snapshots to dynamic shared memory. */ + tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); + SerializeSnapshot(transaction_snapshot, tsnaplen, tsnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, tsnapspace); + asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); + SerializeSnapshot(active_snapshot, asnaplen, asnapspace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); + + /* Serialize transaction state to dynamic shared memory. */ + tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); + SerializeTransactionState(tstatelen, tstatespace); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); + + /* + * Establish error queues in dynamic shared memory. + * + * These queues should be used only for transmitting ErrorResponse, + * NoticeResponse, and NotifyResponse protocol messages. Tuple data should + * be transmitted via separate (possibly larger?) queue. + */ + error_queue_space = + shm_toc_allocate(pcxt->toc, PARALLEL_ERROR_QUEUE_SIZE * pcxt->nworkers); + for (i = 0; i < pcxt->nworkers; ++i) + { + shm_mq *mq; + + mq = shm_mq_create(error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE, + PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_receiver(mq, MyProc); + pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); + } + shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); + + /* Serialize extension entrypoint information to dynamic shared memory. */ + if (pcxt->library_name != NULL) + { + Size lnamelen = strlen(pcxt->library_name); + char *extensionstate; + + extensionstate = shm_toc_allocate(pcxt->toc, lnamelen + + strlen(pcxt->function_name) + 2); + strcpy(extensionstate, pcxt->library_name); + strcpy(extensionstate + lnamelen + 1, pcxt->function_name); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE, + extensionstate); + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Launch parallel workers. + */ +void +LaunchParallelWorkers(ParallelContext *pcxt) +{ + MemoryContext oldcontext; + BackgroundWorker worker; + int i; + + /* We might be running in a very short-lived memory context. */ + oldcontext = MemoryContextSwitchTo(TopTransactionContext); + + /* Configure a worker. */ + snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", + MyProcPid); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = ParallelMain; + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); + worker.bgw_notify_pid = MyProcPid; + + /* + * Start workers. + * + * The caller must be able to tolerate ending up with fewer workers than + * expected, so there is no need to throw an error here if registration + * fails. It wouldn't help much anyway, because registering the worker + * in no way guarantees that it will start up and initialize successfully. + * We do, however, give up on registering any more workers once + * registration fails the first time; no sense beating our head against + * a brick wall. + */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (RegisterDynamicBackgroundWorker(&worker, + &pcxt->worker[i].bgwhandle)) + shm_mq_set_handle(pcxt->worker[i].error_mqh, + pcxt->worker[i].bgwhandle); + else + { + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + } + } + + /* Restore previous memory context. */ + MemoryContextSwitchTo(oldcontext); +} + +/* + * Wait for all workers to exit cleanly. + */ +void +WaitForParallelWorkersToFinish(ParallelContext *pcxt) +{ + for (;;) + { + bool anyone_alive = false; + int i; + + /* + * This will process any parallel messages that are pending, which + * may change the outcome of the loop that follows. It may also + * throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].error_mqh != NULL) + { + anyone_alive = true; + break; + } + } + + if (!anyone_alive) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1); + ResetLatch(&MyProc->procLatch); + } +} + +/* + * Destroy a parallel context. + * + * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() + * first, before calling this function. When this function is invoked, any + * remaining workers are forcibly killed and the dynamic shared memory segment + * is immediately unmapped. + */ +void +DestroyParallelContext(ParallelContext *pcxt) +{ + int i; + + /* + * Be careful about order of operations here! We remove the parallel + * context from the list before we do anything else; otherwise, if an + * error occurs during a subsequent step, we might try to nuke it again + * from AtEOXact_Parallel or AtEOSubXact_Parallel. + */ + dlist_delete(&pcxt->node); + + /* Kill and forget about each worker in turn. */ + for (i = 0; i < pcxt->nworkers; ++i) + { + if (pcxt->worker[i].bgwhandle != NULL) + { + TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].bgwhandle); + } + if (pcxt->worker[i].error_mqh != NULL) + pfree(pcxt->worker[i].error_mqh); + } + + /* Free the worker array itself. */ + pfree(pcxt->worker); + pcxt->worker = NULL; + + /* + * If we have allocated a shared memory segment, detach it. This will + * implicitly detach the error queues, and any other shared memory queues, + * stored there. + */ + if (pcxt->seg != NULL) + dsm_detach(pcxt->seg); + + /* Free memory. */ + pfree(pcxt); +} + +/* + * Are there any parallel contexts currently active? + */ +bool +ParallelContextActive(void) +{ + return !dlist_is_empty(&pcxt_list); +} + +/* + * Handle receipt of an interrupt indicating a parallel worker message. + * + * If signal_handler is true, we are being called from a signal handler and must + * be extremely cautious about what we do here! + */ +void +HandleParallelMessageInterrupt(bool signal_handler) +{ + int save_errno = errno; + + /* Don't joggle the elbow of proc_exit */ + if (!proc_exit_inprogress) + { + InterruptPending = true; + ParallelMessagePending = true; + + /* + * If it's safe to interrupt, service the interrupt immediately. + * (We shouldn't be in parallel mode if waiting for the user to send + * a new query, but we could be waiting for a lock.) + */ + if ((ImmediateInterruptOK || !signal_handler) + && InterruptHoldoffCount == 0 && CritSectionCount == 0 + && !HandlingParallelMessages) + { + bool notify_enabled; + bool catchup_enabled; + bool save_ImmediateInterruptOK; + + /* + * Disable everything that might recursively interrupt us. + * + * If there were any possibility that disabling and re-enabling + * interrupts or handling parallel messages might take a lock, we'd + * need to HOLD_INTERRUPTS() as well, since taking a lock might + * cause ImmediateInterruptOK to get temporarily reset to true. + * But that shouldn't happen, so this is (hopefully) safe. That's + * good, because it lets us respond to query cancel and die + * interrupts while we're in the midst of message-processing. + */ + save_ImmediateInterruptOK = ImmediateInterruptOK; + ImmediateInterruptOK = false; + notify_enabled = DisableNotifyInterrupt(); + catchup_enabled = DisableCatchupInterrupt(); + HandlingParallelMessages = true; + + /* OK, do the work... */ + HandleParallelMessages(); + + /* Now re-enable whatever was enabled before */ + HandlingParallelMessages = false; + if (catchup_enabled) + EnableCatchupInterrupt(); + if (notify_enabled) + EnableNotifyInterrupt(); + ImmediateInterruptOK = save_ImmediateInterruptOK; + } + } + + errno = save_errno; +} + +/* + * Handle any queued protocol messages received from parallel workers. + */ +static void +HandleParallelMessages(void) +{ + dlist_iter iter; + + ParallelMessagePending = false; + + dlist_foreach(iter, &pcxt_list) + { + ParallelContext *pcxt; + int i; + Size nbytes; + void *data; + + pcxt = dlist_container(ParallelContext, node, iter.cur); + if (pcxt->worker == NULL) + continue; + + for (i = 0; i < pcxt->nworkers; ++i) + { + /* + * Read messages for as long as we have an error queue; if we + * have hit (or hit while reading) ReadyForQuery, this will go to + * NULL. + */ + while (pcxt->worker[i].error_mqh != NULL) + { + shm_mq_result res; + + CHECK_FOR_INTERRUPTS(); + + res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, + &data, true); + if (res == SHM_MQ_SUCCESS) + { + StringInfoData msg; + + initStringInfo(&msg); + appendBinaryStringInfo(&msg, data, nbytes); + HandleParallelMessage(pcxt, i, &msg); + pfree(msg.data); + } + else if (res == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */ + errmsg("lost connection to parallel worker"))); + } + } + } +} + +/* + * Handle a single protocol message received from a single parallel worker. + */ +static void +HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) +{ + char msgtype; + + msgtype = pq_getmsgbyte(msg); + + switch (msgtype) + { + case 'E': + case 'N': + { + ErrorData edata; + + /* Parse ErrorReponse or NoticeResponse. */ + pq_parse_errornotice(msg, &edata); + + /* Death of a worker is insufficient justification for suicide. */ + edata.elevel = Min(edata.elevel, ERROR); + + /* + * XXX. We should probably use the error context callbacks in + * effect at the time the parallel context was created. + */ + + ThrowErrorData(&edata); + + break; + } + + case 'A': + { + /* Propagate NotifyResponse. */ + pq_putmessage(msg->data[0], &msg->data[1], msg->len - 1); + break; + } + + case 'Z': + { + /* ReadyForQuery indicates that this worker exits cleanly. */ + pfree(pcxt->worker[i].bgwhandle); + pfree(pcxt->worker[i].error_mqh); + pcxt->worker[i].bgwhandle = NULL; + pcxt->worker[i].error_mqh = NULL; + break; + } + + default: + { + elog(ERROR, "unknown message type: %c (%d bytes)", + msgtype, msg->len); + } + } +} + +/* + * End-of-subtransaction cleanup for parallel contexts. + * + * Currently, it's forbidden to enter or leave a subtransaction while + * parallel mode is in effect, so we could just blow away everything. But + * we may want to relax that restriction in the future, so this code + * contemplates that there may be multiple subtransaction IDs in pcxt_list. + */ +void +AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) +{ + HandlingParallelMessages = false; + + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (pcxt->subid != mySubId) + break; + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * End-of-transaction cleanup for parallel contexts. + */ +void +AtEOXact_Parallel(bool isCommit) +{ + HandlingParallelMessages = false; + + while (!dlist_is_empty(&pcxt_list)) + { + ParallelContext *pcxt; + + pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); + if (isCommit) + elog(WARNING, "leaked parallel context"); + DestroyParallelContext(pcxt); + } +} + +/* + * Main entrypoint for parallel workers. + */ +static void +ParallelMain(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + FixedParallelState *fps; + char *error_queue_space; + shm_mq *mq; + shm_mq_handle *mqh; + char *gucspace; + char *combocidspace; + char *tsnapspace; + char *asnapspace; + char *tstatespace; + ErrorContextCallback errctx; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, handle_sigterm); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "parallel worker", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* + * Now that we have a resource owner, we can attach to the dynamic + * shared memory segment and read the table of contents. + */ + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Determine and set our worker number. */ + fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED); + Assert(fps != NULL); + Assert(ParallelWorkerNumber == -1); + SpinLockAcquire(&fps->mutex); + if (fps->workers_attached < fps->workers_expected) + ParallelWorkerNumber = fps->workers_attached++; + SpinLockRelease(&fps->mutex); + if (ParallelWorkerNumber < 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("too many parallel workers already attached"))); + + /* + * Now that we have a worker number, we can find and attach to the error + * queue provided for us. That's good, because until we do that, any + * errors that happen here will not be reported back to the process that + * requested that this worker be launched. + */ + error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE); + mq = (shm_mq *) (error_queue_space + + ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); + shm_mq_set_sender(mq, MyProc); + mqh = shm_mq_attach(mq, seg, NULL); + pq_redirect_to_shm_mq(mq, mqh); + pq_set_parallel_master(fps->parallel_master_pid, + fps->parallel_master_backend_id); + + /* Install an error-context callback. */ + errctx.callback = ParallelErrorContext; + errctx.arg = NULL; + errctx.previous = error_context_stack; + error_context_stack = &errctx; + + /* + * Hooray! Primary initialization is complete. Now, we need to set up + * our backend-local state to match the original backend. + */ + + /* Restore database connection. */ + BackgroundWorkerInitializeConnectionByOid(fps->database_id, + fps->authenticated_user_id); + + /* Restore GUC values from launching backend. */ + gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC); + Assert(gucspace != NULL); + StartTransactionCommand(); + RestoreGUCState(gucspace); + CommitTransactionCommand(); + + /* Handle local_preload_libraries and session_preload_libraries. */ + process_session_preload_libraries(); + + /* Crank up a transaction state appropriate to a parallel worker. */ + tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE); + StartParallelWorkerTransaction(tstatespace); + + /* Restore combo CID state. */ + combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID); + Assert(combocidspace != NULL); + RestoreComboCIDState(combocidspace); + + /* Restore transaction snapshot. */ + tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT); + Assert(tsnapspace != NULL); + RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace), + fps->parallel_master_pgproc); + + /* Restore active snapshot. */ + asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT); + Assert(asnapspace != NULL); + PushActiveSnapshot(RestoreSnapshot(asnapspace)); + + /* Restore user ID and security context. */ + SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); + + /* + * We've initialized all of our state now; nothing should change hereafter. + */ + EnterParallelMode(); + + /* + * Time to do the real work: invoke the caller-supplied code. + * + * If you get a crash at this line, see the comments for + * ParallelExtensionTrampoline. + */ + fps->entrypoint(seg, toc); + + /* Must exit parallel mode to pop active snapshot. */ + ExitParallelMode(); + + /* Must pop active snapshot so resowner.c doesn't complain. */ + PopActiveSnapshot(); + + /* Shut down the parallel-worker transaction. */ + EndParallelWorkerTransaction(); + + /* Report success. */ + ReadyForQuery(DestRemote); +} + +/* + * It's unsafe for the entrypoint invoked by ParallelMain to be a function + * living in a dynamically loaded module, because the module might not be + * loaded in every process, or might be loaded but not at the same address. + * To work around that problem, CreateParallelContextForExtension() arranges + * to call this function rather than calling the extension-provided function + * directly; and this function then looks up the real entrypoint and calls it. + */ +static void +ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc) +{ + char *extensionstate; + char *library_name; + char *function_name; + parallel_worker_main_type entrypt; + + extensionstate = shm_toc_lookup(toc, PARALLEL_KEY_EXTENSION_TRAMPOLINE); + Assert(extensionstate != NULL); + library_name = extensionstate; + function_name = extensionstate + strlen(library_name) + 1; + + entrypt = (parallel_worker_main_type) + load_external_function(library_name, function_name, true, NULL); + entrypt(seg, toc); +} + +/* + * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just + * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right + * thing. + */ +static void +ParallelErrorContext(void *arg) +{ + errcontext("parallel worker, pid %d", MyProcPid); +} + +/* + * When we receive a SIGTERM, we set InterruptPending and ProcDiePending just + * like a normal backend. The next CHECK_FOR_INTERRUPTS() will do the right + * thing. + */ +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + if (MyProc) + SetLatch(&MyProc->procLatch); + + if (!proc_exit_inprogress) + { + InterruptPending = true; + ProcDiePending = true; + } + + errno = save_errno; +} diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index c541156..92f657e 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -50,6 +50,13 @@ GetNewTransactionId(bool isSubXact) TransactionId xid; /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new XIDs after that point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign TransactionIds during a parallel operation"); + + /* * During bootstrap initialization, we return the special bootstrap * transaction id. */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8b2f714..76aeef7 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -22,6 +22,7 @@ #include "access/commit_ts.h" #include "access/multixact.h" +#include "access/parallel.h" #include "access/subtrans.h" #include "access/transam.h" #include "access/twophase.h" @@ -49,6 +50,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" +#include "utils/builtins.h" #include "utils/catcache.h" #include "utils/combocid.h" #include "utils/guc.h" @@ -76,6 +78,30 @@ bool XactDeferrable; int synchronous_commit = SYNCHRONOUS_COMMIT_ON; /* + * Only the top frame of the transaction state stack is copied to a parallel + * worker, but GetTopTransactionId() and TransactionIdIsCurrentTransactionId() + * need to return the same answers in the parallel worker as they would have + * in the user backend, so we need some additional bookkeeping. + * + * XactTopTransactionId stores the XID of our toplevel transaction, which + * will be the same as TopTransactionState.transactionId in an ordinary + * backend; but in a parallel backend, which does not have the entire + * transaction state, it will instead be copied from the backend that started + * the parallel operation. + * + * nParallelCurrentXids will be 0 and ParallelCurrentXids NULL in an ordinary + * backend, but in a parallel backend, nParallelCurrentXids will contain the + * number of XIDs that need to be considered current, and ParallelCurrentXids + * will contain the XIDs themselves. This includes all XIDs that were current + * or sub-committed in the parent at the time the parallel operation began. + * The XIDs are stored sorted in numerical order (not logical order) to make + * lookups as fast as possible. + */ +TransactionId XactTopTransactionId = InvalidTransactionId; +int nParallelCurrentXids = 0; +TransactionId *ParallelCurrentXids; + +/* * MyXactAccessedTempRel is set when a temporary relation is accessed. * We don't allow PREPARE TRANSACTION in that case. (This is global * so that it can be set from heapam.c.) @@ -111,6 +137,7 @@ typedef enum TBlockState /* transaction block states */ TBLOCK_BEGIN, /* starting transaction block */ TBLOCK_INPROGRESS, /* live transaction */ + TBLOCK_PARALLEL_INPROGRESS, /* live transaction inside parallel worker */ TBLOCK_END, /* COMMIT received */ TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */ TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */ @@ -152,6 +179,7 @@ typedef struct TransactionStateData bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ bool didLogXid; /* has xid been included in WAL record? */ + bool parallelMode; /* current transaction in parallel operation? */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -182,6 +210,7 @@ static TransactionStateData TopTransactionStateData = { false, /* entry-time xact r/o state */ false, /* startedInRecovery */ false, /* didLogXid */ + false, /* parallelMode */ NULL /* link to parent state block */ }; @@ -351,9 +380,9 @@ IsAbortedTransactionBlockState(void) TransactionId GetTopTransactionId(void) { - if (!TransactionIdIsValid(TopTransactionStateData.transactionId)) + if (!TransactionIdIsValid(XactTopTransactionId)) AssignTransactionId(&TopTransactionStateData); - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -366,7 +395,7 @@ GetTopTransactionId(void) TransactionId GetTopTransactionIdIfAny(void) { - return TopTransactionStateData.transactionId; + return XactTopTransactionId; } /* @@ -460,6 +489,13 @@ AssignTransactionId(TransactionState s) Assert(s->state == TRANS_INPROGRESS); /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new XIDs at this point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot assign XIDs during a parallel operation"); + + /* * Ensure parent(s) have XIDs, so that a child always has an XID later * than its parent. Musn't recurse here, or we might get a stack overflow * if we're at the bottom of a huge stack of subtransactions none of which @@ -511,6 +547,8 @@ AssignTransactionId(TransactionState s) * the Xid as "running". See GetNewTransactionId. */ s->transactionId = GetNewTransactionId(isSubXact); + if (!isSubXact) + XactTopTransactionId = s->transactionId; if (isSubXact) SubTransSetParent(s->transactionId, s->parent->transactionId, false); @@ -642,7 +680,16 @@ GetCurrentCommandId(bool used) { /* this is global to a transaction, not subtransaction-local */ if (used) + { + /* + * Forbid setting currentCommandIdUsed in parallel mode, because we + * have no provision for communicating this back to the master. We + * could relax this restriction when currentCommandIdUsed was already + * true at the start of the parallel operation. + */ + Assert(!CurrentTransactionState->parallelMode); currentCommandIdUsed = true; + } return currentCommandId; } @@ -736,6 +783,36 @@ TransactionIdIsCurrentTransactionId(TransactionId xid) return false; /* + * In parallel workers, the XIDs we must consider as current are stored + * in ParallelCurrentXids rather than the transaction-state stack. Note + * that the XIDs in this array are sorted numerically rather than + * according to transactionIdPrecedes order. + */ + if (nParallelCurrentXids > 0) + { + int low, + high; + + low = 0; + high = nParallelCurrentXids - 1; + while (low <= high) + { + int middle; + TransactionId probe; + + middle = low + (high - low) / 2; + probe = ParallelCurrentXids[middle]; + if (probe == xid) + return true; + else if (probe < xid) + low = middle + 1; + else + high = middle - 1; + } + return false; + } + + /* * We will return true for the Xid of the current subtransaction, any of * its subcommitted children, any of its parents, or any of their * previously subcommitted children. However, a transaction being aborted @@ -789,6 +866,53 @@ TransactionStartedDuringRecovery(void) } /* + * EnterParallelMode + */ +void +EnterParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't let the transaction state be changed + * after that point. That includes the parallel mode flag itself. + */ + Assert(!s->parallelMode); + + s->parallelMode = true; +} + +/* + * ExitParallelMode + */ +void +ExitParallelMode(void) +{ + TransactionState s = CurrentTransactionState; + + Assert(s->parallelMode); + Assert(!ParallelContextActive()); + + s->parallelMode = false; +} + +/* + * IsInParallelMode + * + * Are we in a parallel operation, as either the master or a worker? Check + * this to prohibit operations that change backend-local state expected to + * match across all workers. Mere caches usually don't require such a + * restriction. State modified in a strict push/pop fashion, such as the + * active snapshot stack, is often fine. + */ +bool +IsInParallelMode(void) +{ + return CurrentTransactionState->parallelMode; +} + +/* * CommandCounterIncrement */ void @@ -802,6 +926,14 @@ CommandCounterIncrement(void) */ if (currentCommandIdUsed) { + /* + * Workers synchronize transaction state at the beginning of each + * parallel operation, so we can't account for new commands after that + * point. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot start commands during a parallel operation"); + currentCommandId += 1; if (currentCommandId == InvalidCommandId) { @@ -1705,6 +1837,8 @@ StartTransaction(void) s = &TopTransactionStateData; CurrentTransactionState = s; + Assert(XactTopTransactionId == InvalidTransactionId); + /* * check the current transaction state */ @@ -1834,6 +1968,9 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool parallel; + + parallel = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); ShowTransactionState("CommitTransaction"); @@ -1845,6 +1982,10 @@ CommitTransaction(void) TransStateAsString(s->state)); Assert(s->parent == NULL); + /* If we might have parallel workers, clean them up now. */ + if (IsInParallelMode()) + AtEOXact_Parallel(true); + /* * Do pre-commit processing that involves calling user-defined code, such * as triggers. Since closing cursors could queue trigger actions, @@ -1867,7 +2008,8 @@ CommitTransaction(void) break; } - CallXactCallbacks(XACT_EVENT_PRE_COMMIT); + CallXactCallbacks(parallel ? XACT_EVENT_PARALLEL_PRE_COMMIT + : XACT_EVENT_PRE_COMMIT); /* * The remaining actions cannot call any user-defined code, so it's safe @@ -1915,9 +2057,13 @@ CommitTransaction(void) s->state = TRANS_COMMIT; /* - * Here is where we really truly commit. + * Unless we're in parallel mode, we need to mark our XIDs as committed + * in pg_clog. This is where durably commit. */ - latestXid = RecordTransactionCommit(); + if (parallel) + latestXid = InvalidTransactionId; + else + latestXid = RecordTransactionCommit(); TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); @@ -1944,7 +2090,8 @@ CommitTransaction(void) * state. */ - CallXactCallbacks(XACT_EVENT_COMMIT); + CallXactCallbacks(parallel ? XACT_EVENT_PARALLEL_COMMIT + : XACT_EVENT_COMMIT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -1992,7 +2139,8 @@ CommitTransaction(void) AtEOXact_GUC(true, 1); AtEOXact_SPI(true); AtEOXact_on_commit_actions(true); - AtEOXact_Namespace(true); + if (!parallel) + AtEOXact_Namespace(true); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2017,6 +2165,9 @@ CommitTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with commit processing, set current transaction state back to * default @@ -2040,6 +2191,8 @@ PrepareTransaction(void) GlobalTransaction gxact; TimestampTz prepared_at; + Assert(!IsInParallelMode()); + ShowTransactionState("PrepareTransaction"); /* @@ -2284,6 +2437,9 @@ PrepareTransaction(void) s->nChildXids = 0; s->maxChildXids = 0; + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; + /* * done with 1st phase commit processing, set current transaction state * back to default @@ -2302,6 +2458,7 @@ AbortTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + bool parallel; /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -2350,6 +2507,7 @@ AbortTransaction(void) /* * check the current transaction state */ + parallel = (s->blockState == TBLOCK_PARALLEL_INPROGRESS); if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE) elog(WARNING, "AbortTransaction while in %s state", TransStateAsString(s->state)); @@ -2384,10 +2542,22 @@ AbortTransaction(void) AtAbort_Twophase(); /* + * If we might have parallel workers, send them all termination signals, + * and wait for them to die. + */ + if (IsInParallelMode()) + AtEOXact_Parallel(false); + + /* * Advertise the fact that we aborted in pg_clog (assuming that we got as - * far as assigning an XID to advertise). + * far as assigning an XID to advertise). But if we're inside a parallel + * worker, skip this; the user backend must be the one to write the abort + * record. */ - latestXid = RecordTransactionAbort(false); + if (parallel) + latestXid = InvalidTransactionId; + else + latestXid = RecordTransactionAbort(false); TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2405,7 +2575,10 @@ AbortTransaction(void) */ if (TopTransactionResourceOwner != NULL) { - CallXactCallbacks(XACT_EVENT_ABORT); + if (parallel) + CallXactCallbacks(XACT_EVENT_PARALLEL_ABORT); + else + CallXactCallbacks(XACT_EVENT_ABORT); ResourceOwnerRelease(TopTransactionResourceOwner, RESOURCE_RELEASE_BEFORE_LOCKS, @@ -2426,7 +2599,8 @@ AbortTransaction(void) AtEOXact_GUC(false, 1); AtEOXact_SPI(false); AtEOXact_on_commit_actions(false); - AtEOXact_Namespace(false); + if (!parallel) + AtEOXact_Namespace(false); AtEOXact_SMgr(); AtEOXact_Files(); AtEOXact_ComboCid(); @@ -2478,6 +2652,10 @@ CleanupTransaction(void) s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0; + s->parallelMode = false; + + XactTopTransactionId = InvalidTransactionId; + nParallelCurrentXids = 0; /* * done with abort processing, set current transaction state back to @@ -2531,6 +2709,7 @@ StartTransactionCommand(void) /* These cases are invalid. */ case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -2566,11 +2745,13 @@ CommitTransactionCommand(void) switch (s->blockState) { /* - * This shouldn't happen, because it means the previous + * These shouldn't happen. TBLOCK_DEFAULT means the previous * StartTransactionCommand didn't set the STARTED state - * appropriately. + * appropriately, while TBLOCK_PARALLEL_INPROGRESS should be ended + * by EndParallelWorkerTranaction(), not this function. */ case TBLOCK_DEFAULT: + case TBLOCK_PARALLEL_INPROGRESS: elog(FATAL, "CommitTransactionCommand: unexpected state %s", BlockStateAsString(s->blockState)); break; @@ -2852,6 +3033,7 @@ AbortCurrentTransaction(void) * ABORT state. We will stay in ABORT until we get a ROLLBACK. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: AbortTransaction(); s->blockState = TBLOCK_ABORT; /* CleanupTransaction happens when we exit TBLOCK_ABORT_END */ @@ -3241,6 +3423,7 @@ BeginTransactionBlock(void) * Already a transaction block in progress. */ case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_ABORT: case TBLOCK_SUBABORT: @@ -3418,6 +3601,16 @@ EndTransactionBlock(void) result = true; break; + /* + * The user issued a COMMIT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3511,6 +3704,16 @@ UserAbortTransactionBlock(void) s->blockState = TBLOCK_ABORT_PENDING; break; + /* + * The user issued an ABORT that somehow ran inside a parallel + * worker. We can't cope with that. + */ + case TBLOCK_PARALLEL_INPROGRESS: + ereport(FATAL, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot abort during a parallel operation"))); + break; + /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: @@ -3540,6 +3743,18 @@ DefineSavepoint(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot define savepoints during a parallel operation"))); + switch (s->blockState) { case TBLOCK_INPROGRESS: @@ -3560,6 +3775,7 @@ DefineSavepoint(char *name) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3594,6 +3810,18 @@ ReleaseSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot release savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3617,6 +3845,7 @@ ReleaseSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3694,6 +3923,18 @@ RollbackToSavepoint(List *options) ListCell *cell; char *name = NULL; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for transaction state change after that + * point. (Note that this check will certainly error out if s->blockState + * is TBLOCK_PARALLEL_INPROGRESS, so we can treat that as an invalid case + * below.) + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot rollback to savepoints during a parallel operation"))); + switch (s->blockState) { /* @@ -3718,6 +3959,7 @@ RollbackToSavepoint(List *options) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -3806,6 +4048,20 @@ BeginInternalSubTransaction(char *name) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for new subtransactions after that point. + * We might be able to make an exception for the type of subtransaction + * established by this function, which is typically used in contexts where + * we're going to release or roll back the subtransaction before proceeding + * further, so that no enduring change to the transaction state occurs. + * For now, however, we prohibit this case along with all the others. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot start subtransactions during a parallel operation"))); + switch (s->blockState) { case TBLOCK_STARTED: @@ -3828,6 +4084,7 @@ BeginInternalSubTransaction(char *name) /* These cases are invalid. */ case TBLOCK_DEFAULT: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_SUBRELEASE: case TBLOCK_SUBCOMMIT: @@ -3860,6 +4117,18 @@ ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Workers synchronize transaction state at the beginning of each parallel + * operation, so we can't account for commit of subtransactions after that + * point. This should not happen anyway. Code calling this would + * typically have called BeginInternalSubTransaction() first, failing + * there. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot commit subtransactions during a parallel operation"))); + if (s->blockState != TBLOCK_SUBINPROGRESS) elog(ERROR, "ReleaseCurrentSubTransaction: unexpected state %s", BlockStateAsString(s->blockState)); @@ -3882,6 +4151,14 @@ RollbackAndReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + /* + * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted + * during parallel operations. That's because we may be in the master, + * recovering from an error thrown while we were in parallel mode. We + * won't reach here in a worker, because BeginInternalSubTransaction() + * will have failed. + */ + switch (s->blockState) { /* Must be in a subtransaction */ @@ -3893,6 +4170,7 @@ RollbackAndReleaseCurrentSubTransaction(void) case TBLOCK_DEFAULT: case TBLOCK_STARTED: case TBLOCK_BEGIN: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: case TBLOCK_END: @@ -3968,6 +4246,7 @@ AbortOutOfAnyTransaction(void) case TBLOCK_STARTED: case TBLOCK_BEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_END: case TBLOCK_ABORT_PENDING: case TBLOCK_PREPARE: @@ -4059,6 +4338,7 @@ TransactionBlockStatusCode(void) case TBLOCK_BEGIN: case TBLOCK_SUBBEGIN: case TBLOCK_INPROGRESS: + case TBLOCK_PARALLEL_INPROGRESS: case TBLOCK_SUBINPROGRESS: case TBLOCK_END: case TBLOCK_SUBRELEASE: @@ -4157,6 +4437,13 @@ CommitSubTransaction(void) elog(WARNING, "CommitSubTransaction while in %s state", TransStateAsString(s->state)); + /* Exit from parallel mode, if necessary. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(true, s->subTransactionId); + s->parallelMode = false; + } + /* Pre-commit processing goes here */ CallSubXactCallbacks(SUBXACT_EVENT_PRE_COMMIT_SUB, s->subTransactionId, @@ -4315,6 +4602,13 @@ AbortSubTransaction(void) */ SetUserIdAndSecContext(s->prevUser, s->prevSecContext); + /* Exit from parallel mode, if necessary. */ + if (IsInParallelMode()) + { + AtEOSubXact_Parallel(false, s->subTransactionId); + s->parallelMode = false; + } + /* * We can skip all this stuff if the subxact failed before creating a * ResourceOwner... @@ -4455,6 +4749,7 @@ PushTransaction(void) s->blockState = TBLOCK_SUBBEGIN; GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; + s->parallelMode = false; CurrentTransactionState = s; @@ -4502,6 +4797,134 @@ PopTransaction(void) } /* + * EstimateTransactionStateSpace + * Estimate the amount of space that will be needed by + * SerializeTransactionState. It would be OK to overestimate slightly, + * but it's simple for us to work out the precise value, so we do. + */ +Size +EstimateTransactionStateSpace(void) +{ + TransactionState s; + Size nxids = 3; /* top XID, current XID, count of XIDs */ + + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + + nxids = add_size(nxids, nParallelCurrentXids); + return mul_size(nxids, sizeof(TransactionId)); +} + +/* + * SerializeTransactionState + * Write out relevant details of our transaction state that will be + * needed by a parallel worker. + * + * Currently, the only information we attempt to save and restore here is + * the XIDs associated with this transaction. The first eight bytes of the + * result contain the XID of the top-level transaction and the XID of the + * current transaction (or, in each case, InvalidTransactionId if none). + * The next 4 bytes contain a count of how many additional XIDs follow; + * this is followed by all of those XIDs one after another. We emit the XIDs + * in sorted order for the convenience of the receiving process. + */ +void +SerializeTransactionState(Size maxsize, char *start_address) +{ + TransactionState s; + Size nxids = 0; + Size i = 0; + TransactionId *workspace; + TransactionId *result = (TransactionId *) start_address; + + Assert(maxsize >= 3 * sizeof(TransactionId)); + result[0] = XactTopTransactionId; + result[1] = CurrentTransactionState->transactionId; + + /* + * If we're running in a parallel worker and launching a parallel worker + * of our own, we can just pass along the information that was passed to + * us. + */ + if (nParallelCurrentXids > 0) + { + Assert(maxsize > (nParallelCurrentXids + 2) * sizeof(TransactionId)); + result[2] = nParallelCurrentXids; + memcpy(&result[3], ParallelCurrentXids, + nParallelCurrentXids * sizeof(TransactionId)); + return; + } + + /* + * OK, we need to generate a sorted list of XIDs that our workers + * should view as current. First, figure out how many there are. + */ + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + nxids = add_size(nxids, 1); + nxids = add_size(nxids, s->nChildXids); + } + Assert(nxids * sizeof(TransactionId) < maxsize); + + /* Copy them to our scratch space. */ + workspace = palloc(nxids * sizeof(TransactionId)); + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (TransactionIdIsValid(s->transactionId)) + workspace[i++] = s->transactionId; + memcpy(&workspace[i], s->childXids, + s->nChildXids * sizeof(TransactionId)); + i += s->nChildXids; + } + Assert(i == nxids); + + /* Sort them. */ + qsort(workspace, nxids, sizeof(TransactionId), xidComparator); + + /* Copy data into output area. */ + result[2] = (TransactionId) nxids; + memcpy(&result[3], workspace, nxids * sizeof(TransactionId)); +} + +/* + * StartParallelWorkerTransaction + * Start a parallel worker transaction, restoring the relevant + * transaction state serialized by SerializeTransactionState. + */ +void +StartParallelWorkerTransaction(char *tstatespace) +{ + TransactionId *tstate = (TransactionId *) tstatespace; + + Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT); + StartTransaction(); + + XactTopTransactionId = tstate[0]; + CurrentTransactionState->transactionId = tstate[1]; + nParallelCurrentXids = (int) tstate[2]; + ParallelCurrentXids = &tstate[3]; + + CurrentTransactionState->blockState = TBLOCK_PARALLEL_INPROGRESS; +} + +/* + * EndParallelWorkerTransaction + * End a parallel worker transaction. + */ +void +EndParallelWorkerTransaction(void) +{ + Assert(CurrentTransactionState->blockState == TBLOCK_PARALLEL_INPROGRESS); + CommitTransaction(); + CurrentTransactionState->blockState = TBLOCK_DEFAULT; +} + +/* * ShowTransactionState * Debug support */ @@ -4571,6 +4994,8 @@ BlockStateAsString(TBlockState blockState) return "BEGIN"; case TBLOCK_INPROGRESS: return "INPROGRESS"; + case TBLOCK_PARALLEL_INPROGRESS: + return "PARALLEL_INPROGRESS"; case TBLOCK_END: return "END"; case TBLOCK_ABORT: diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index ed2b05a..5398b70 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -476,7 +476,7 @@ BootstrapModeMain(void) */ InitProcess(); - InitPostgres(NULL, InvalidOid, NULL, NULL); + InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL); /* Initialize stuff for bootstrap-file processing */ for (i = 0; i < MAXATTR; i++) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 8b1c727..0cc35b4 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -914,9 +914,10 @@ DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed) { Assert(rel); - /* check read-only transaction */ + /* check read-only transaction and parallel mode */ if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); + PreventCommandIfParallelMode("COPY FROM"); cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program, stmt->attlist, stmt->options); diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 811e1d4..d9b0aed 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -551,6 +551,13 @@ nextval_internal(Oid relid) if (!seqrel->rd_islocaltemp) PreventCommandIfReadOnly("nextval()"); + /* + * Forbid this during parallel operation because, to make it work, + * the cooperating backends would need to share the backend-local cached + * sequence information. Currently, we don't support that. + */ + PreventCommandIfParallelMode("nextval()"); + if (elm->last != elm->cached) /* some numbers were cached */ { Assert(elm->last_valid); @@ -838,6 +845,13 @@ do_setval(Oid relid, int64 next, bool iscalled) if (!seqrel->rd_islocaltemp) PreventCommandIfReadOnly("setval()"); + /* + * Forbid this during parallel operation because, to make it work, + * the cooperating backends would need to share the backend-local cached + * sequence information. Currently, we don't support that. + */ + PreventCommandIfParallelMode("setval()"); + /* lock page' buffer and read tuple */ seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 8c799d3..9223f5e 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -135,8 +135,20 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) /* * If the transaction is read-only, we need to check if any writes are * planned to non-temporary tables. EXPLAIN is considered read-only. + * + * Don't allow writes in parallel mode. Supporting UPDATE and DELETE would + * require (a) storing the combocid hash in shared memory, rather than + * synchronizing it just once at the start of parallelism, and (b) an + * alternative to heap_update()'s reliance on xmax for mutual exclusion. + * INSERT may have no such troubles, but we forbid it to simplify the + * checks. + * + * We have lower-level defenses in CommandCounterIncrement and elsewhere + * against performing unsafe operations in parallel mode, but this gives + * a more user-friendly error message. */ - if (XactReadOnly && !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + if ((XactReadOnly || IsInParallelMode()) && + !(eflags & EXEC_FLAG_EXPLAIN_ONLY)) ExecCheckXactReadOnly(queryDesc->plannedstmt); /* @@ -679,18 +691,23 @@ ExecCheckRTEPerms(RangeTblEntry *rte) } /* - * Check that the query does not imply any writes to non-temp tables. + * Check that the query does not imply any writes to non-temp tables; + * unless we're in parallel mode, in which case don't even allow writes + * to temp tables. * * Note: in a Hot Standby slave this would need to reject writes to temp - * tables as well; but an HS slave can't have created any temp tables - * in the first place, so no need to check that. + * tables just as we do in parallel mode; but an HS slave can't have created + * any temp tables in the first place, so no need to check that. */ static void ExecCheckXactReadOnly(PlannedStmt *plannedstmt) { ListCell *l; - /* Fail if write permissions are requested on any non-temp table */ + /* + * Fail if write permissions are requested in parallel mode for + * table (temp or non-temp), otherwise fail for any non-temp table. + */ foreach(l, plannedstmt->rtable) { RangeTblEntry *rte = (RangeTblEntry *) lfirst(l); @@ -701,6 +718,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) if ((rte->requiredPerms & (~ACL_SELECT)) == 0) continue; + PreventCommandIfParallelMode(CreateCommandTag((Node *) plannedstmt)); + if (isTempNamespace(get_rel_namespace(rte->relid))) continue; diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 4d11260..62b615a 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -513,6 +513,9 @@ init_execution_state(List *queryTree_list, errmsg("%s is not allowed in a non-volatile function", CreateCommandTag(stmt)))); + if (IsInParallelMode() && !CommandIsReadOnly(stmt)) + PreventCommandIfParallelMode(CreateCommandTag(stmt)); + /* OK, build the execution_state for this query */ newes = (execution_state *) palloc(sizeof(execution_state)); if (preves) diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index cfa4a24..72d55c7 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -23,6 +23,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/spi_priv.h" +#include "miscadmin.h" #include "tcop/pquery.h" #include "tcop/utility.h" #include "utils/builtins.h" @@ -1322,13 +1323,14 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, } /* - * If told to be read-only, we'd better check for read-only queries. This - * can't be done earlier because we need to look at the finished, planned - * queries. (In particular, we don't want to do it between GetCachedPlan - * and PortalDefineQuery, because throwing an error between those steps - * would result in leaking our plancache refcount.) + * If told to be read-only, or in parallel mode, verify that this query + * is in fact read-only. This can't be done earlier because we need to + * look at the finished, planned queries. (In particular, we don't want + * to do it between GetCachedPlan and PortalDefineQuery, because throwing + * an error between those steps would result in leaking our plancache + * refcount.) */ - if (read_only) + if (read_only || IsInParallelMode()) { ListCell *lc; @@ -1337,11 +1339,16 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, Node *pstmt = (Node *) lfirst(lc); if (!CommandIsReadOnly(pstmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a non-volatile function", - CreateCommandTag(pstmt)))); + { + if (read_only) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a non-volatile function", + CreateCommandTag(pstmt)))); + else + PreventCommandIfParallelMode(CreateCommandTag(pstmt)); + } } } @@ -2129,6 +2136,9 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, errmsg("%s is not allowed in a non-volatile function", CreateCommandTag(stmt)))); + if (IsInParallelMode() && !CommandIsReadOnly(stmt)) + PreventCommandIfParallelMode(CreateCommandTag(stmt)); + /* * If not read-only mode, advance the command counter before each * command and update the snapshot. diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 6e6b429..b07978c 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -16,12 +16,15 @@ #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "libpq/pqmq.h" +#include "miscadmin.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" static shm_mq *pq_mq; static shm_mq_handle *pq_mq_handle; static bool pq_mq_busy = false; +static pid_t pq_mq_parallel_master_pid = 0; +static pid_t pq_mq_parallel_master_backend_id = InvalidBackendId; static void mq_comm_reset(void); static int mq_flush(void); @@ -57,6 +60,18 @@ pq_redirect_to_shm_mq(shm_mq *mq, shm_mq_handle *mqh) FrontendProtocol = PG_PROTOCOL_LATEST; } +/* + * Arrange to SendProcSignal() to the parallel master each time we transmit + * message data via the shm_mq. + */ +void +pq_set_parallel_master(pid_t pid, BackendId backend_id) +{ + Assert(PqCommMethods == &PqCommMqMethods); + pq_mq_parallel_master_pid = pid; + pq_mq_parallel_master_backend_id = backend_id; +} + static void mq_comm_reset(void) { @@ -120,7 +135,23 @@ mq_putmessage(char msgtype, const char *s, size_t len) iov[1].len = len; Assert(pq_mq_handle != NULL); - result = shm_mq_sendv(pq_mq_handle, iov, 2, false); + + for (;;) + { + result = shm_mq_sendv(pq_mq_handle, iov, 2, true); + + if (pq_mq_parallel_master_pid != 0) + SendProcSignal(pq_mq_parallel_master_pid, + PROCSIG_PARALLEL_MESSAGE, + pq_mq_parallel_master_backend_id); + + if (result != SHM_MQ_WOULD_BLOCK) + break; + + WaitLatch(&MyProc->procLatch, WL_LATCH_SET, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } pq_mq_busy = false; diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 675f985..637749a 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -471,7 +471,7 @@ AutoVacLauncherMain(int argc, char *argv[]) InitProcess(); #endif - InitPostgres(NULL, InvalidOid, NULL, NULL); + InitPostgres(NULL, InvalidOid, NULL, InvalidOid, NULL); SetProcessingMode(NormalProcessing); @@ -1664,7 +1664,7 @@ AutoVacWorkerMain(int argc, char *argv[]) * Note: if we have selected a just-deleted database (due to using * stale stats info), we'll fail and exit here. */ - InitPostgres(NULL, dbid, NULL, dbname); + InitPostgres(NULL, dbid, NULL, InvalidOid, dbname); SetProcessingMode(NormalProcessing); set_ps_display(dbname, false); ereport(DEBUG1, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 5106f52..451f814 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -5305,7 +5305,30 @@ BackgroundWorkerInitializeConnection(char *dbname, char *username) (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database connection requirement not indicated during registration"))); - InitPostgres(dbname, InvalidOid, username, NULL); + InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL); + + /* it had better not gotten out of "init" mode yet */ + if (!IsInitProcessingMode()) + ereport(ERROR, + (errmsg("invalid processing mode in background worker"))); + SetProcessingMode(NormalProcessing); +} + +/* + * Connect background worker to a database using OIDs. + */ +void +BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid) +{ + BackgroundWorker *worker = MyBgworkerEntry; + + /* XXX is this the right errcode? */ + if (!(worker->bgw_flags & BGWORKER_BACKEND_DATABASE_CONNECTION)) + ereport(FATAL, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("database connection requirement not indicated during registration"))); + + InitPostgres(NULL, dboid, NULL, useroid, NULL); /* it had better not gotten out of "init" mode yet */ if (!IsInitProcessingMode()) diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index d953545..7b7b57a 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1686,6 +1686,50 @@ ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid) } /* + * ProcArrayInstallRestoredXmin -- install restored xmin into MyPgXact->xmin + * + * This is like ProcArrayInstallImportedXmin, but we have a pointer to the + * PGPROC of the transaction from which we imported the snapshot, rather than + * an XID. + * + * Returns TRUE if successful, FALSE if source xact is no longer running. + */ +bool +ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc) +{ + bool result = false; + TransactionId xid; + volatile PGXACT *pgxact; + + Assert(TransactionIdIsNormal(xmin)); + Assert(proc != NULL); + + /* Get lock so source xact can't end while we're doing this */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + + pgxact = &allPgXact[proc->pgprocno]; + + /* + * Be certain that the referenced PGPROC has an advertised xmin which + * is no later than the one we're installing, so that the system-wide + * xmin can't go backwards. Also, make sure it's running in the same + * database, so that the per-database xmin cannot go backwards. + */ + xid = pgxact->xmin; /* fetch just once */ + if (proc->databaseId == MyDatabaseId && + TransactionIdIsNormal(xid) && + TransactionIdPrecedesOrEquals(xid, xmin)) + { + MyPgXact->xmin = TransactionXmin = xmin; + result = true; + } + + LWLockRelease(ProcArrayLock); + + return result; +} + +/* * GetRunningTransactionData -- returns information about running transactions. * * Similar to GetSnapshotData but returns more information. We include diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index cd9a287..0678678 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -17,6 +17,7 @@ #include #include +#include "access/parallel.h" #include "commands/async.h" #include "miscadmin.h" #include "storage/latch.h" @@ -274,6 +275,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_NOTIFY_INTERRUPT)) HandleNotifyInterrupt(); + if (CheckProcSignal(PROCSIG_PARALLEL_MESSAGE)) + HandleParallelMessageInterrupt(true); + if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_DATABASE)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_DATABASE); diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index e783955..b35cbbd 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -1653,6 +1653,14 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, Assert(!RecoveryInProgress()); + /* + * Since all parts of a serializable transaction must use the same + * snapshot, it is too late to establish one after a parallel operation + * has begun. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot establish serializable snapshot during a parallel operation"); + proc = MyProc; Assert(proc != NULL); GET_VXID_FROM_PGPROC(vxid, *proc); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index cc62b2c..4285748 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -37,6 +37,7 @@ #include "rusagestub.h" #endif +#include "access/parallel.h" #include "access/printtup.h" #include "access/xact.h" #include "catalog/pg_type.h" @@ -2968,7 +2969,8 @@ ProcessInterrupts(void) errmsg("canceling statement due to user request"))); } } - /* If we get here, do nothing (probably, QueryCancelPending was reset) */ + if (ParallelMessagePending) + HandleParallelMessageInterrupt(false); } @@ -3704,7 +3706,7 @@ PostgresMain(int argc, char *argv[], * it inside InitPostgres() instead. In particular, anything that * involves database access should be there, not here. */ - InitPostgres(dbname, InvalidOid, username, NULL); + InitPostgres(dbname, InvalidOid, username, InvalidOid, NULL); /* * If the PostmasterContext is still around, recycle the space; we don't diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index 71580e8..623f985 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -128,14 +128,15 @@ CommandIsReadOnly(Node *parsetree) static void check_xact_readonly(Node *parsetree) { - if (!XactReadOnly) + /* Only perform the check if we have a reason to do so. */ + if (!XactReadOnly && !IsInParallelMode()) return; /* * Note: Commands that need to do more complicated checking are handled * elsewhere, in particular COPY and plannable statements do their own - * checking. However they should all call PreventCommandIfReadOnly to - * actually throw the error. + * checking. However they should all call PreventCommandIfReadOnly + * or PreventCommandIfParallelMode to actually throw the error. */ switch (nodeTag(parsetree)) @@ -207,6 +208,7 @@ check_xact_readonly(Node *parsetree) case T_ImportForeignSchemaStmt: case T_SecLabelStmt: PreventCommandIfReadOnly(CreateCommandTag(parsetree)); + PreventCommandIfParallelMode(CreateCommandTag(parsetree)); break; default: /* do nothing */ @@ -232,6 +234,24 @@ PreventCommandIfReadOnly(const char *cmdname) } /* + * PreventCommandIfParallelMode: throw error if current (sub)transaction is + * in parallel mode. + * + * This is useful mainly to ensure consistency of the error message wording; + * most callers have checked IsInParallelMode() for themselves. + */ +void +PreventCommandIfParallelMode(const char *cmdname) +{ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + /* translator: %s is name of a SQL command, eg CREATE */ + errmsg("cannot execute %s during a parallel operation", + cmdname))); +} + +/* * PreventCommandDuringRecovery: throw error if RecoveryInProgress * * The majority of operations that are unsafe in a Hot Standby slave @@ -630,6 +650,7 @@ standard_ProcessUtility(Node *parsetree, case T_ClusterStmt: /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("CLUSTER"); + /* forbidden in parallel mode due to CommandIsReadOnly */ cluster((ClusterStmt *) parsetree, isTopLevel); break; @@ -640,6 +661,7 @@ standard_ProcessUtility(Node *parsetree, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery((stmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"); + /* forbidden in parallel mode due to CommandIsReadOnly */ vacuum(stmt, InvalidOid, true, NULL, false, isTopLevel); } break; @@ -716,6 +738,7 @@ standard_ProcessUtility(Node *parsetree, * outside a transaction block is presumed to be user error. */ RequireTransactionChain(isTopLevel, "LOCK TABLE"); + /* forbidden in parallel mode due to CommandIsReadOnly */ LockTableCommand((LockStmt *) parsetree); break; @@ -747,6 +770,7 @@ standard_ProcessUtility(Node *parsetree, /* we choose to allow this during "read only" transactions */ PreventCommandDuringRecovery("REINDEX"); + /* forbidden in parallel mode due to CommandIsReadOnly */ switch (stmt->kind) { case REINDEX_OBJECT_INDEX: diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 8fccb4c..a045062 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -350,11 +350,10 @@ has_rolreplication(Oid roleid) * Initialize user identity during normal backend startup */ void -InitializeSessionUserId(const char *rolename) +InitializeSessionUserId(const char *rolename, Oid roleid) { HeapTuple roleTup; Form_pg_authid rform; - Oid roleid; /* * Don't do scans if we're bootstrapping, none of the system catalogs @@ -365,7 +364,10 @@ InitializeSessionUserId(const char *rolename) /* call only once */ AssertState(!OidIsValid(AuthenticatedUserId)); - roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename)); + if (rolename != NULL) + roleTup = SearchSysCache1(AUTHNAME, PointerGetDatum(rolename)); + else + roleTup = SearchSysCache1(AUTHOID, ObjectIdGetDatum(roleid)); if (!HeapTupleIsValid(roleTup)) ereport(FATAL, (errcode(ERRCODE_INVALID_AUTHORIZATION_SPECIFICATION), diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index c348034..8f6d517 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -523,6 +523,9 @@ BaseInit(void) * name can be returned to the caller in out_dbname. If out_dbname isn't * NULL, it must point to a buffer of size NAMEDATALEN. * + * Similarly, the username can be passed by name, using the username parameter, + * or by OID using the useroid parameter. + * * In bootstrap mode no parameters are used. The autovacuum launcher process * doesn't use any parameters either, because it only goes far enough to be * able to read pg_database; it doesn't connect to any particular database. @@ -537,7 +540,7 @@ BaseInit(void) */ void InitPostgres(const char *in_dbname, Oid dboid, const char *username, - char *out_dbname) + Oid useroid, char *out_dbname) { bool bootstrap = IsBootstrapProcessingMode(); bool am_superuser; @@ -692,18 +695,18 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("no roles are defined in this database system"), errhint("You should immediately run CREATE USER \"%s\" SUPERUSER;.", - username))); + username != NULL ? username : "postgres"))); } else if (IsBackgroundWorker) { - if (username == NULL) + if (username == NULL && !OidIsValid(useroid)) { InitializeSessionUserIdStandalone(); am_superuser = true; } else { - InitializeSessionUserId(username); + InitializeSessionUserId(username, useroid); am_superuser = superuser(); } } @@ -712,7 +715,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, /* normal multiuser case */ Assert(MyProcPort != NULL); PerformAuthentication(MyProcPort); - InitializeSessionUserId(username); + InitializeSessionUserId(username, useroid); am_superuser = superuser(); } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 77c3494..6f2a571 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -5656,6 +5656,20 @@ set_config_option(const char *name, const char *value, elevel = ERROR; } + /* + * GUC_ACTION_SAVE changes are acceptable during a parallel operation, + * because the current worker will also pop the change. We're probably + * dealing with a function having a proconfig entry. Only the function's + * body should observe the change, and peer workers do not share in the + * execution of a function call started by this worker. + * + * Other changes might need to affect other workers, so forbid them. + */ + if (IsInParallelMode() && changeVal && action != GUC_ACTION_SAVE) + ereport(elevel, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot set parameters during a parallel operation"))); + record = find_option(name, true, elevel); if (record == NULL) { @@ -6929,6 +6943,15 @@ ExecSetVariableStmt(VariableSetStmt *stmt, bool isTopLevel) { GucAction action = stmt->is_local ? GUC_ACTION_LOCAL : GUC_ACTION_SET; + /* + * Workers synchronize these parameters at the start of the parallel + * operation; then, we block SET during the operation. + */ + if (IsInParallelMode()) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TRANSACTION_STATE), + errmsg("cannot set parameters during a parallel operation"))); + switch (stmt->kind) { case VAR_SET_VALUE: diff --git a/src/backend/utils/time/combocid.c b/src/backend/utils/time/combocid.c index ea7a905..33b0b52 100644 --- a/src/backend/utils/time/combocid.c +++ b/src/backend/utils/time/combocid.c @@ -44,6 +44,7 @@ #include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" +#include "storage/shmem.h" #include "utils/combocid.h" #include "utils/hsearch.h" #include "utils/memutils.h" @@ -286,3 +287,76 @@ GetRealCmax(CommandId combocid) Assert(combocid < usedComboCids); return comboCids[combocid].cmax; } + +/* + * Estimate the amount of space required to serialize the current ComboCID + * state. + */ +Size +EstimateComboCIDStateSpace(void) +{ + Size size; + + /* Add space required for saving usedComboCids */ + size = sizeof(int); + + /* Add space required for saving the combocids key */ + size = add_size(size, mul_size(sizeof(ComboCidKeyData), usedComboCids)); + + return size; +} + +/* + * Serialize the ComboCID state into the memory, beginning at start_address. + * maxsize should be at least as large as the value returned by + * EstimateComboCIDStateSpace. + */ +void +SerializeComboCIDState(Size maxsize, char *start_address) +{ + char *endptr; + + /* First, we store the number of currently-existing ComboCIDs. */ + * (int *) start_address = usedComboCids; + + /* If maxsize is too small, throw an error. */ + endptr = start_address + sizeof(int) + + (sizeof(ComboCidKeyData) * usedComboCids); + if (endptr < start_address || endptr > start_address + maxsize) + elog(ERROR, "not enough space to serialize ComboCID state"); + + /* Now, copy the actual cmin/cmax pairs. */ + memcpy(start_address + sizeof(int), comboCids, + (sizeof(ComboCidKeyData) * usedComboCids)); +} + +/* + * Read the ComboCID state at the specified address and initialize this + * backend with the same ComboCIDs. This is only valid in a backend that + * currently has no ComboCIDs (and only makes sense if the transaction state + * is serialized and restored as well). + */ +void +RestoreComboCIDState(char *comboCIDstate) +{ + int num_elements; + ComboCidKeyData *keydata; + int i; + CommandId cid; + + Assert(!comboCids || !comboHash); + + /* First, we retrieve the number of ComboCIDs that were serialized. */ + num_elements = * (int *) comboCIDstate; + keydata = (ComboCidKeyData *) (comboCIDstate + sizeof(int)); + + /* Use GetComboCommandId to restore each ComboCID. */ + for (i = 0; i < num_elements; i++) + { + cid = GetComboCommandId(keydata[i].cmin, keydata[i].cmax); + + /* Verify that we got the expected answer. */ + if (cid != i) + elog(ERROR, "unexpected command ID while restoring combo CIDs"); + } +} diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c index d601efe..bc071aa 100644 --- a/src/backend/utils/time/snapmgr.c +++ b/src/backend/utils/time/snapmgr.c @@ -155,6 +155,22 @@ static Snapshot CopySnapshot(Snapshot snapshot); static void FreeSnapshot(Snapshot snapshot); static void SnapshotResetXmin(void); +/* + * Snapshot fields to be serialized. + * + * Only these fields need to be sent to the cooperating backend; the + * remaining ones can (and must) set by the receiver upon restore. + */ +typedef struct SerializedSnapshotData +{ + TransactionId xmin; + TransactionId xmax; + uint32 xcnt; + int32 subxcnt; + bool suboverflowed; + bool takenDuringRecovery; + CommandId curcid; +} SerializedSnapshotData; /* * GetTransactionSnapshot @@ -186,6 +202,10 @@ GetTransactionSnapshot(void) Assert(RegisteredSnapshots == 0); Assert(FirstXactSnapshot == NULL); + if (IsInParallelMode()) + elog(ERROR, + "cannot take query snapshot during a parallel operation"); + /* * In transaction-snapshot mode, the first snapshot must live until * end of xact regardless of what the caller does with it, so we must @@ -237,6 +257,13 @@ Snapshot GetLatestSnapshot(void) { /* + * We might be able to relax this, but nothing that could otherwise work + * needs it. + */ + if (IsInParallelMode()) + elog(ERROR, "cannot update SecondarySnapshpt during a parallel operation"); + + /* * So far there are no cases requiring support for GetLatestSnapshot() * during logical decoding, but it wouldn't be hard to add if required. */ @@ -345,7 +372,8 @@ SnapshotSetCommandId(CommandId curcid) * in GetTransactionSnapshot. */ static void -SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) +SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid, + PGPROC *sourceproc) { /* Caller should have checked this already */ Assert(!FirstSnapshotSet); @@ -392,7 +420,15 @@ SetTransactionSnapshot(Snapshot sourcesnap, TransactionId sourcexid) * doesn't seem worth contorting the logic here to avoid two calls, * especially since it's not clear that predicate.c *must* do this. */ - if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) + if (sourceproc != NULL) + { + if (!ProcArrayInstallRestoredXmin(CurrentSnapshot->xmin, sourceproc)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not import the requested snapshot"), + errdetail("The source transaction is not running anymore."))); + } + else if (!ProcArrayInstallImportedXmin(CurrentSnapshot->xmin, sourcexid)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not import the requested snapshot"), @@ -548,11 +584,24 @@ PushCopiedSnapshot(Snapshot snapshot) void UpdateActiveSnapshotCommandId(void) { + CommandId save_curcid, curcid; Assert(ActiveSnapshot != NULL); Assert(ActiveSnapshot->as_snap->active_count == 1); Assert(ActiveSnapshot->as_snap->regd_count == 0); - ActiveSnapshot->as_snap->curcid = GetCurrentCommandId(false); + /* + * Don't allow modification of the active snapshot during parallel + * operation. We share the snapshot to worker backends at beginning of + * parallel operation, so any change to snapshot can lead to + * inconsistencies. We have other defenses against + * CommandCounterIncrement, but there are a few places that call this + * directly, so we put an additional guard here. + */ + save_curcid = ActiveSnapshot->as_snap->curcid; + curcid = GetCurrentCommandId(false); + if (IsInParallelMode() && save_curcid != curcid) + elog(ERROR, "cannot modify commandid in active snapshot during a parallel operation"); + ActiveSnapshot->as_snap->curcid = curcid; } /* @@ -1247,7 +1296,7 @@ ImportSnapshot(const char *idstr) errmsg("cannot import a snapshot from a different database"))); /* OK, install the snapshot */ - SetTransactionSnapshot(&snapshot, src_xid); + SetTransactionSnapshot(&snapshot, src_xid, NULL); } /* @@ -1350,3 +1399,162 @@ HistoricSnapshotGetTupleCids(void) Assert(HistoricSnapshotActive()); return tuplecid_data; } + +/* + * EstimateSnapshotSpace + * Returns the size need to store the given snapshot. + * + * We are exporting only required fields from the Snapshot, stored in + * SerializedSnapshotData. + */ +Size +EstimateSnapshotSpace(Snapshot snap) +{ + Size size; + + Assert(snap != InvalidSnapshot); + Assert(snap->satisfies == HeapTupleSatisfiesMVCC); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = add_size(sizeof(SerializedSnapshotData), + mul_size(snap->xcnt, sizeof(TransactionId))); + if (snap->subxcnt > 0 && + (!snap->suboverflowed || snap->takenDuringRecovery)) + size = add_size(size, + mul_size(snap->subxcnt, sizeof(TransactionId))); + + return size; +} + +/* + * SerializeSnapshot + * Dumps the serialized snapshot (extracted from given snapshot) onto the + * memory location at start_address. + */ +void +SerializeSnapshot(Snapshot snapshot, Size maxsize, char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + + /* If the size is small, throw an error */ + if (maxsize < EstimateSnapshotSpace(snapshot)) + elog(ERROR, "not enough space to serialize given snapshot"); + + Assert(snapshot->xcnt >= 0); + Assert(snapshot->subxcnt >= 0); + + serialized_snapshot = (SerializedSnapshotData *) start_address; + + /* Copy all required fields */ + serialized_snapshot->xmin = snapshot->xmin; + serialized_snapshot->xmax = snapshot->xmax; + serialized_snapshot->xcnt = snapshot->xcnt; + serialized_snapshot->subxcnt = snapshot->subxcnt; + serialized_snapshot->suboverflowed = snapshot->suboverflowed; + serialized_snapshot->takenDuringRecovery = snapshot->takenDuringRecovery; + serialized_snapshot->curcid = snapshot->curcid; + + /* + * Ignore the SubXID array if it has overflowed, unless the snapshot + * was taken during recovey - in that case, top-level XIDs are in subxip + * as well, and we mustn't lose them. + */ + if (serialized_snapshot->suboverflowed && !snapshot->takenDuringRecovery) + serialized_snapshot->subxcnt = 0; + + /* Copy XID array */ + if (snapshot->xcnt > 0) + memcpy((TransactionId *) (serialized_snapshot + 1), + snapshot->xip, snapshot->xcnt * sizeof(TransactionId)); + + /* + * Copy SubXID array. Don't bother to copy it if it had overflowed, + * though, because it's not used anywhere in that case. Except if it's a + * snapshot taken during recovery; all the top-level XIDs are in subxip as + * well in that case, so we mustn't lose them. + */ + if (snapshot->subxcnt > 0) + { + Size subxipoff = sizeof(SerializedSnapshotData) + + snapshot->xcnt * sizeof(TransactionId); + + memcpy((TransactionId *) ((char *) serialized_snapshot + subxipoff), + snapshot->subxip, snapshot->subxcnt * sizeof(TransactionId)); + } +} + +/* + * RestoreSnapshot + * Restore a serialized snapshot from the specified address. + * + * The copy is palloc'd in TopTransactionContext and has initial refcounts set + * to 0. The returned snapshot has the copied flag set. + * + * If set_transaction_snapshot is true, the snapshot is additionally installed + * as the transaction snapshot. + */ +Snapshot +RestoreSnapshot(char *start_address) +{ + SerializedSnapshotData *serialized_snapshot; + Size size; + Snapshot snapshot; + TransactionId *serialized_xids; + + serialized_snapshot = (SerializedSnapshotData *) start_address; + serialized_xids = (TransactionId *) + (start_address + sizeof(SerializedSnapshotData)); + + /* We allocate any XID arrays needed in the same palloc block. */ + size = sizeof(SnapshotData) + + serialized_snapshot->xcnt * sizeof(TransactionId) + + serialized_snapshot->subxcnt * sizeof(TransactionId); + + /* Copy all required fields */ + snapshot = (Snapshot) MemoryContextAlloc(TopTransactionContext, size); + snapshot->satisfies = HeapTupleSatisfiesMVCC; + snapshot->xmin = serialized_snapshot->xmin; + snapshot->xmax = serialized_snapshot->xmax; + snapshot->xip = NULL; + snapshot->xcnt = serialized_snapshot->xcnt; + snapshot->subxip = NULL; + snapshot->subxcnt = serialized_snapshot->subxcnt; + snapshot->suboverflowed = serialized_snapshot->suboverflowed; + snapshot->takenDuringRecovery = serialized_snapshot->takenDuringRecovery; + snapshot->curcid = serialized_snapshot->curcid; + + /* Copy XIDs, if present. */ + if (serialized_snapshot->xcnt > 0) + { + snapshot->xip = (TransactionId *) (snapshot + 1); + memcpy(snapshot->xip, serialized_xids, + serialized_snapshot->xcnt * sizeof(TransactionId)); + } + + /* Copy SubXIDs, if present. */ + if (serialized_snapshot->subxcnt > 0) + { + snapshot->subxip = snapshot->xip + serialized_snapshot->xcnt; + memcpy(snapshot->subxip, serialized_xids + serialized_snapshot->xcnt, + serialized_snapshot->subxcnt * sizeof(TransactionId)); + } + + /* Set the copied flag so that the caller will set refcounts correctly. */ + snapshot->regd_count = 0; + snapshot->active_count = 0; + snapshot->copied = true; + + return snapshot; +} + +/* + * Install a restored snapshot as the transaction snapshot. + * + * The second argument is of type void * so that snapmgr.h need not include + * the declaration for PGPROC. + */ +void +RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc) +{ + SetTransactionSnapshot(snapshot, InvalidTransactionId, master_pgproc); +} diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h new file mode 100644 index 0000000..b651218 --- /dev/null +++ b/src/include/access/parallel.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * + * parallel.h + * Infrastructure for launching parallel workers + * + * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/parallel.h + * + *------------------------------------------------------------------------- + */ + +#ifndef PARALLEL_H +#define PARALLEL_H + +#include "lib/ilist.h" +#include "postmaster/bgworker.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" + +typedef void (*parallel_worker_main_type)(dsm_segment *seg, shm_toc *toc); + +typedef struct ParallelWorkerInfo +{ + BackgroundWorkerHandle *bgwhandle; + shm_mq_handle *error_mqh; +} ParallelWorkerInfo; + +typedef struct ParallelContext +{ + dlist_node node; + SubTransactionId subid; + int nworkers; + parallel_worker_main_type entrypoint; + char *library_name; + char *function_name; + shm_toc_estimator estimator; + dsm_segment *seg; + shm_toc *toc; + ParallelWorkerInfo *worker; +} ParallelContext; + +extern bool ParallelMessagePending; + +extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers); +extern ParallelContext *CreateParallelContextForExtension(char *library_name, + char *function_name, int nworkers); +extern void InitializeParallelDSM(ParallelContext *); +extern void LaunchParallelWorkers(ParallelContext *); +extern void WaitForParallelWorkersToFinish(ParallelContext *); +extern void DestroyParallelContext(ParallelContext *); +extern bool ParallelContextActive(void); + +extern void HandleParallelMessageInterrupt(bool signal_handler); +extern void AtEOXact_Parallel(bool isCommit); +extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId); + +#endif /* PARALLEL_H */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index b018aa4..91d9d73 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -77,9 +77,12 @@ extern bool MyXactAccessedTempRel; typedef enum { XACT_EVENT_COMMIT, + XACT_EVENT_PARALLEL_COMMIT, XACT_EVENT_ABORT, + XACT_EVENT_PARALLEL_ABORT, XACT_EVENT_PREPARE, XACT_EVENT_PRE_COMMIT, + XACT_EVENT_PARALLEL_PRE_COMMIT, XACT_EVENT_PRE_PREPARE } XactEvent; @@ -241,6 +244,10 @@ extern void BeginInternalSubTransaction(char *name); extern void ReleaseCurrentSubTransaction(void); extern void RollbackAndReleaseCurrentSubTransaction(void); extern bool IsSubTransaction(void); +extern Size EstimateTransactionStateSpace(void); +extern void SerializeTransactionState(Size maxsize, char *start_address); +extern void StartParallelWorkerTransaction(char *tstatespace); +extern void EndParallelWorkerTransaction(void); extern bool IsTransactionBlock(void); extern bool IsTransactionOrTransactionBlock(void); extern char TransactionBlockStatusCode(void); @@ -260,4 +267,8 @@ extern void xact_redo(XLogReaderState *record); extern void xact_desc(StringInfo buf, XLogReaderState *record); extern const char *xact_identify(uint8 info); +extern void EnterParallelMode(void); +extern void ExitParallelMode(void); +extern bool IsInParallelMode(void); + #endif /* XACT_H */ diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index f8e68c9..78583b3 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -17,6 +17,7 @@ #include "storage/shm_mq.h" extern void pq_redirect_to_shm_mq(shm_mq *, shm_mq_handle *); +extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 1558a75..fb7754f 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -258,6 +258,7 @@ extern void check_stack_depth(void); /* in tcop/utility.c */ extern void PreventCommandIfReadOnly(const char *cmdname); +extern void PreventCommandIfParallelMode(const char *cmdname); extern void PreventCommandDuringRecovery(const char *cmdname); /* in utils/misc/guc.c */ @@ -290,7 +291,7 @@ extern bool InLocalUserIdChange(void); extern bool InSecurityRestrictedOperation(void); extern void GetUserIdAndContext(Oid *userid, bool *sec_def_context); extern void SetUserIdAndContext(Oid userid, bool sec_def_context); -extern void InitializeSessionUserId(const char *rolename); +extern void InitializeSessionUserId(const char *rolename, Oid useroid); extern void InitializeSessionUserIdStandalone(void); extern void SetSessionAuthorization(Oid userid, bool is_superuser); extern Oid GetCurrentRoleId(void); @@ -391,7 +392,7 @@ extern AuxProcType MyAuxProcType; extern void pg_split_opts(char **argv, int *argcp, char *optstr); extern void InitializeMaxBackends(void); extern void InitPostgres(const char *in_dbname, Oid dboid, const char *username, - char *out_dbname); + Oid useroid, char *out_dbname); extern void BaseInit(void); /* in utils/init/miscinit.c */ diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index a3b3d5f..273cecb 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -130,6 +130,9 @@ extern PGDLLIMPORT BackgroundWorker *MyBgworkerEntry; */ extern void BackgroundWorkerInitializeConnection(char *dbname, char *username); +/* Just like the above, but specifying database and user by OID. */ +extern void BackgroundWorkerInitializeConnectionByOid(Oid dboid, Oid useroid); + /* Block/unblock signals in a background worker process */ extern void BackgroundWorkerBlockSignals(void); extern void BackgroundWorkerUnblockSignals(void); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 0c4611b..9f4e4b9 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -46,6 +46,7 @@ extern Snapshot GetSnapshotData(Snapshot snapshot); extern bool ProcArrayInstallImportedXmin(TransactionId xmin, TransactionId sourcexid); +extern bool ProcArrayInstallRestoredXmin(TransactionId xmin, PGPROC *proc); extern RunningTransactions GetRunningTransactionData(void); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index c625562..3046e52 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -31,6 +31,7 @@ typedef enum { PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */ PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */ + PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */ /* Recovery conflict reasons */ PROCSIG_RECOVERY_CONFLICT_DATABASE, diff --git a/src/include/utils/combocid.h b/src/include/utils/combocid.h index 9e482ff..232f729 100644 --- a/src/include/utils/combocid.h +++ b/src/include/utils/combocid.h @@ -21,5 +21,8 @@ */ extern void AtEOXact_ComboCid(void); +extern void RestoreComboCIDState(char *comboCIDstate); +extern void SerializeComboCIDState(Size maxsize, char *start_address); +extern Size EstimateComboCIDStateSpace(void); #endif /* COMBOCID_H */ diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h index abe7016..7efd427 100644 --- a/src/include/utils/snapmgr.h +++ b/src/include/utils/snapmgr.h @@ -64,4 +64,10 @@ extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids) extern void TeardownHistoricSnapshot(bool is_error); extern bool HistoricSnapshotActive(void); +extern Size EstimateSnapshotSpace(Snapshot snapshot); +extern void SerializeSnapshot(Snapshot snapshot, Size maxsize, + char *start_address); +extern Snapshot RestoreSnapshot(char *start_address); +extern void RestoreTransactionSnapshot(Snapshot snapshot, void *master_pgproc); + #endif /* SNAPMGR_H */