diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index ff31e7537e6..fd49ef3b5c7 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -177,13 +177,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; using a very large number of processes. - - - - The transaction isolation level is serializable. This is - a limitation of the current implementation. - - @@ -235,16 +228,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; making it ineligible for parallel query. - - - - The transaction isolation level is serializable. This situation - does not normally arise, because parallel query plans are not - generated when the transaction isolation level is serializable. - However, it can happen if the transaction isolation level is changed to - serializable after the plan is generated and before it is executed. - - diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 17b10383e44..7a24b5ec33b 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -27,6 +27,7 @@ #include "optimizer/planmain.h" #include "pgstat.h" #include "storage/ipc.h" +#include "storage/predicate_internals.h" #include "storage/sinval.h" #include "storage/spin.h" #include "tcop/tcopprot.h" @@ -77,6 +78,7 @@ typedef struct FixedParallelState PGPROC *parallel_master_pgproc; pid_t parallel_master_pid; BackendId parallel_master_backend_id; + SERIALIZABLEXACT *parallel_master_serializablexact; /* Mutex protects remaining fields. */ slock_t mutex; @@ -152,14 +154,6 @@ CreateParallelContext(const char *library_name, const char *function_name, if (dynamic_shared_memory_type == DSM_IMPL_NONE) nworkers = 0; - /* - * If we are running under serializable isolation, we can't use parallel - * workers, at least not until somebody enhances that mechanism to be - * parallel-aware. - */ - if (IsolationIsSerializable()) - nworkers = 0; - /* We might be running in a short-lived memory context. */ oldcontext = MemoryContextSwitchTo(TopTransactionContext); @@ -281,6 +275,7 @@ InitializeParallelDSM(ParallelContext *pcxt) fps->parallel_master_pgproc = MyProc; fps->parallel_master_pid = MyProcPid; fps->parallel_master_backend_id = MyBackendId; + fps->parallel_master_serializablexact = GetSerializableXact(); SpinLockInit(&fps->mutex); fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); @@ -1089,6 +1084,9 @@ ParallelWorkerMain(Datum main_arg) /* Set ParallelMasterBackendId so we know how to address temp relations. */ ParallelMasterBackendId = fps->parallel_master_backend_id; + /* Use the leader's SERIALIZABLEXACT. */ + SetSerializableXact(fps->parallel_master_serializablexact); + /* * We've initialized all of our state now; nothing should change * hereafter. diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 2988c1181b9..6b213f9fe0a 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -261,14 +261,6 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) * parallel worker. We might eventually be able to relax this * restriction, but for now it seems best not to have parallel workers * trying to create their own parallel workers. - * - * We can't use parallelism in serializable mode because the predicate - * locking code is not parallel-aware. It's not catastrophic if someone - * tries to run a parallel plan in serializable mode; it just won't get - * any workers and will run serially. But it seems like a good heuristic - * to assume that the same serialization level will be in effect at plan - * time and execution time, so don't generate a parallel plan if we're in - * serializable mode. */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && @@ -276,8 +268,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) parse->commandType == CMD_SELECT && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && - !IsParallelWorker() && - !IsolationIsSerializable()) + !IsParallelWorker()) { /* all the cheap tests pass, so scan the query tree */ glob->maxParallelHazard = max_parallel_hazard(parse); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 82a1cf5150b..21beb8d463c 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -494,7 +494,7 @@ RegisterLWLockTranches(void) if (LWLockTrancheArray == NULL) { - LWLockTranchesAllocated = 64; + LWLockTranchesAllocated = 128; LWLockTrancheArray = (char **) MemoryContextAllocZero(TopMemoryContext, LWLockTranchesAllocated * sizeof(char *)); @@ -511,6 +511,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_PARALLEL_QUERY_DSA, "parallel_query_dsa"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); + LWLockRegisterTranche(LWTRANCHE_SXACT, "sxact"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index a4cb4d33add..f43cc68e78d 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -97,7 +97,9 @@ * - All transactions share this single lock (with no partitioning). * - There is never a need for a process other than the one running * an active transaction to walk the list of locks held by that - * transaction. + * transaction, except parallel query workers sharing the leader's + * transaction. In the parallel case, an extra per-sxact lock is + * taken; see below. * - It is relatively infrequent that another process needs to * modify the list for a transaction, but it does happen for such * things as index page splits for pages with predicate locks and @@ -116,6 +118,12 @@ * than its own active transaction must acquire an exclusive * lock. * + * SERIALIZABLE_XACT's member 'lock' + * - Protects the linked list of locks held by a transaction. Only + * needed for parallel mode, where multiple backends share the + * same SERIALIZABLEXACT object. Not needed if + * SerializablePredicateLockListLock is held exclusively. + * * FirstPredicateLockMgrLock based partition locks * - The same lock protects a target, all locks on that target, and * the linked list of locks on the target.. @@ -184,6 +192,7 @@ #include "postgres.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/slru.h" #include "access/subtrans.h" #include "access/transam.h" @@ -1810,6 +1819,7 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot, SHMQueueInit(&(sxact->predicateLocks)); SHMQueueElemInit(&(sxact->finishedLink)); sxact->flags = 0; + LWLockInitialize(&sxact->lock, LWTRANCHE_SXACT); if (XactReadOnly) { sxact->flags |= SXACT_FLAG_READ_ONLY; @@ -2092,6 +2102,14 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + if (IsInParallelMode()) + { + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE) || + LWLockHeldByMeInMode(&MySerializableXact->lock, + LW_EXCLUSIVE)); + } + /* Can't remove it until no locks at this target. */ if (!SHMQueueEmpty(&target->predicateLocks)) return; @@ -2109,7 +2127,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash) * This implementation is assuming that the usage of each target tag field * is uniform. No need to make this hard if we don't have to. * - * We aren't acquiring lightweight locks for the predicate lock or lock + * We acquire an LWLock in the case of parallel mode, because worker + * backends have access to the leader's SERIALIABLEXACT. Otherwise, + * we aren't acquiring lightweight locks for the predicate lock or lock * target structures associated with this transaction unless we're going * to modify them, because no other process is permitted to modify our * locks. @@ -2122,6 +2142,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); sxact = MySerializableXact; + if (IsInParallelMode()) + LWLockAcquire(&sxact->lock, LW_EXCLUSIVE); predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -2175,6 +2197,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag) predlock = nextpredlock; } + if (IsInParallelMode()) + LWLockRelease(&sxact->lock); LWLockRelease(SerializablePredicateLockListLock); } @@ -2373,6 +2397,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, partitionLock = PredicateLockHashPartitionLock(targettaghash); LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&sxact->lock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* Make sure that the target is represented. */ @@ -2410,6 +2436,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag, } LWLockRelease(partitionLock); + if (IsInParallelMode()) + LWLockRelease(&sxact->lock); LWLockRelease(SerializablePredicateLockListLock); } @@ -2597,7 +2625,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) PREDICATELOCK *nextpredlock; bool found; - Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE)); Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash))); predlock = (PREDICATELOCK *) @@ -2657,7 +2686,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash) * covers it, or if we are absolutely certain that no one will need to * refer to that lock in the future. * - * Caller must hold SerializablePredicateLockListLock. + * Caller must hold SerializablePredicateLockListLock exclusively. */ static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, @@ -2672,7 +2701,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag, bool found; bool outOfShmem = false; - Assert(LWLockHeldByMe(SerializablePredicateLockListLock)); + Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock, + LW_EXCLUSIVE)); oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag); newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag); @@ -3269,6 +3299,10 @@ ReleasePredicateLocks(bool isCommit) */ bool topLevelIsDeclaredReadOnly; + /* Only leader processes should release predicate locks. */ + if (IsParallelWorker()) + goto cleanup; + if (MySerializableXact == InvalidSerializableXact) { Assert(LocalPredicateLockHash == NULL); @@ -3555,6 +3589,7 @@ ReleasePredicateLocks(bool isCommit) MySerializableXact = InvalidSerializableXact; MyXactDidWrite = false; +cleanup: /* Delete per-transaction lock table */ if (LocalPredicateLockHash != NULL) { @@ -4244,6 +4279,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) PREDICATELOCK *rmpredlock; LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + if (IsInParallelMode()) + LWLockAcquire(&MySerializableXact->lock, LW_EXCLUSIVE); LWLockAcquire(partitionLock, LW_EXCLUSIVE); LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE); @@ -4278,6 +4315,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag) LWLockRelease(SerializableXactHashLock); LWLockRelease(partitionLock); + if (IsInParallelMode()) + LWLockRelease(&MySerializableXact->lock); LWLockRelease(SerializablePredicateLockListLock); if (rmpredlock != NULL) @@ -4826,6 +4865,11 @@ AtPrepare_PredicateLocks(void) */ LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED); + /* + * No need to take sxact->lock in parallel mode because there cannot be + * any parallel workers running while we are preparing a transaction. + */ + predlock = (PREDICATELOCK *) SHMQueueNext(&(sxact->predicateLocks), &(sxact->predicateLocks), @@ -5034,3 +5078,22 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info, CreatePredicateLock(&lockRecord->target, targettaghash, sxact); } } + +/* + * Accessor to allow parallel leaders to export the current SERIALIZABLEXACT + * to parallel workers. + */ +SERIALIZABLEXACT * +GetSerializableXact(void) +{ + return MySerializableXact; +} + +/* + * Allow parallel workers to import the leader's SERIALIZABLEXACT. + */ +void +SetSerializableXact(SERIALIZABLEXACT *sxact) +{ + MySerializableXact = sxact; +} diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 3d16132c88f..d9640139ae5 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -213,6 +213,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PREDICATE_LOCK_MANAGER, LWTRANCHE_PARALLEL_QUERY_DSA, LWTRANCHE_TBM, + LWTRANCHE_SXACT, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/include/storage/predicate_internals.h b/src/include/storage/predicate_internals.h index 89874a5c3b6..64560d4d3a4 100644 --- a/src/include/storage/predicate_internals.h +++ b/src/include/storage/predicate_internals.h @@ -15,6 +15,7 @@ #define PREDICATE_INTERNALS_H #include "storage/lock.h" +#include "storage/lwlock.h" /* * Commit number. @@ -91,6 +92,9 @@ typedef struct SERIALIZABLEXACT SHM_QUEUE finishedLink; /* list link in * FinishedSerializableTransactions */ + /* lock to protect predicateLocks list in parallel mode */ + LWLock lock; + /* * for r/o transactions: list of concurrent r/w transactions that we could * potentially have conflicts with, and vice versa for r/w transactions @@ -475,5 +479,7 @@ typedef struct TwoPhasePredicateRecord extern PredicateLockData *GetPredicateLockStatusData(void); extern int GetSafeSnapshotBlockingPids(int blocked_pid, int *output, int output_size); +extern SERIALIZABLEXACT *GetSerializableXact(void); +extern void SetSerializableXact(SERIALIZABLEXACT *sxact); #endif /* PREDICATE_INTERNALS_H */ diff --git a/src/test/isolation/expected/serializable-parallel.out b/src/test/isolation/expected/serializable-parallel.out new file mode 100644 index 00000000000..f43aa6a2990 --- /dev/null +++ b/src/test/isolation/expected/serializable-parallel.out @@ -0,0 +1,44 @@ +Parsed test spec with 3 sessions + +starting permutation: s2rx s2ry s1ry s1wy s1c s2wx s2c s3c +step s2rx: SELECT balance FROM bank_account WHERE id = 'X'; +balance + +0 +step s2ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y'; +step s1c: COMMIT; +step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X'; +step s2c: COMMIT; +step s3c: COMMIT; + +starting permutation: s2rx s2ry s1ry s1wy s1c s3r s3c s2wx +step s2rx: SELECT balance FROM bank_account WHERE id = 'X'; +balance + +0 +step s2ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1ry: SELECT balance FROM bank_account WHERE id = 'Y'; +balance + +0 +step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y'; +step s1c: COMMIT; +step s3r: SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; +id balance + +X 0 +Y 20 +step s3c: COMMIT; +step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X'; +ERROR: could not serialize access due to read/write dependencies among transactions diff --git a/src/test/isolation/isolation_schedule b/src/test/isolation/isolation_schedule index 32c965b2a02..e428357e772 100644 --- a/src/test/isolation/isolation_schedule +++ b/src/test/isolation/isolation_schedule @@ -62,3 +62,4 @@ test: sequence-ddl test: async-notify test: vacuum-reltuples test: timeouts +test: serializable-parallel diff --git a/src/test/isolation/specs/serializable-parallel.spec b/src/test/isolation/specs/serializable-parallel.spec new file mode 100644 index 00000000000..0e7c2c7c1fa --- /dev/null +++ b/src/test/isolation/specs/serializable-parallel.spec @@ -0,0 +1,48 @@ +# The example from the paper "A read-only transaction anomaly under snapshot +# isolation"[1]. +# +# Here we test that serializable snapshot isolation (SERIALIZABLE) doesn't +# suffer from the anomaly, because s2 is aborted upon detection of a cycle. +# In this case the read only query s3 happens to be running in a parallel +# worker. +# +# [1] http://www.cs.umb.edu/~poneil/ROAnom.pdf + +setup +{ + CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL); + INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0); +} + +teardown +{ + DROP TABLE bank_account; +} + +session "s1" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s1ry" { SELECT balance FROM bank_account WHERE id = 'Y'; } +step "s1wy" { UPDATE bank_account SET balance = 20 WHERE id = 'Y'; } +step "s1c" { COMMIT; } + +session "s2" +setup { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; } +step "s2rx" { SELECT balance FROM bank_account WHERE id = 'X'; } +step "s2ry" { SELECT balance FROM bank_account WHERE id = 'Y'; } +step "s2wx" { UPDATE bank_account SET balance = -11 WHERE id = 'X'; } +step "s2c" { COMMIT; } + +session "s3" +setup { + BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; + SET max_parallel_workers_per_gather = 2; + SET force_parallel_mode = on; + } +step "s3r" { SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; } +step "s3c" { COMMIT; } + +# without s3, s1 and s2 commit +permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s2wx" "s2c" "s3c" + +# once s3 observes the data committed by s1, a cycle is created and s2 aborts +permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s3r" "s3c" "s2wx"