From ef7898c6512c91352c68eaca7b10e7811449861c Mon Sep 17 00:00:00 2001 From: Hari Babu Date: Wed, 3 Jan 2018 17:36:11 +1100 Subject: [PATCH 11/12] Improve tuple locking interface Currently, executor code have to traverse heap update chains. That's doesn't seems to be acceptable if we're going to have pluggable storage access methods whose could provide alternative implementations for our MVCC model. New locking function is responsible for finding latest tuple version (if required). EvalPlanQual() is now only responsible for re-evaluating quals, but not for locking tuple. In addition, we've distinguish HeapTupleUpdated and HeapTupleDeleted HTSU_Result's, because in alternative MVCC implementations multiple tuple versions may have same TID, and immutability of TID after update isn't sign of tuple deletion anymore. For the same reason, TID is not pointer to particular tuple version anymore. And in order to point particular tuple version we're going to lock, we've to provide snapshot as well. In heap storage access method, this snapshot is used for assert checking only, but it might be vital for other storage access methods. Similar changes are upcoming to tuple_update() and tuple_delete() interface methods. --- src/backend/access/heap/heapam.c | 125 +++++++------ src/backend/access/heap/heapam_storage.c | 217 +++++++++++++++++++++- src/backend/access/heap/heapam_visibility.c | 23 ++- src/backend/access/storage/storageam.c | 11 +- src/backend/commands/trigger.c | 67 +++---- src/backend/executor/execMain.c | 278 +--------------------------- src/backend/executor/execReplication.c | 36 ++-- src/backend/executor/nodeLockRows.c | 67 +++---- src/backend/executor/nodeModifyTable.c | 152 ++++++++++----- src/include/access/heapam.h | 9 +- src/include/access/storageam.h | 8 +- src/include/access/storageamapi.h | 6 +- src/include/executor/executor.h | 6 +- src/include/nodes/lockoptions.h | 5 + src/include/utils/snapshot.h | 1 + 15 files changed, 516 insertions(+), 495 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index fe083b6bb5..0df5ff8b20 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -3191,6 +3191,7 @@ l1: { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated || + result == HeapTupleDeleted || result == HeapTupleBeingUpdated); Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID)); hufd->ctid = tp.t_data->t_ctid; @@ -3204,6 +3205,8 @@ l1: UnlockTupleTuplock(relation, &(tp.t_self), LockTupleExclusive); if (vmbuffer != InvalidBuffer) ReleaseBuffer(vmbuffer); + if (result == HeapTupleUpdated && ItemPointerEquals(tid, &hufd->ctid)) + result = HeapTupleDeleted; return result; } @@ -3413,6 +3416,10 @@ simple_heap_delete(Relation relation, ItemPointer tid) elog(ERROR, "tuple concurrently updated"); break; + case HeapTupleDeleted: + elog(ERROR, "tuple concurrently deleted"); + break; + default: elog(ERROR, "unrecognized heap_delete status: %u", result); break; @@ -3832,6 +3839,7 @@ l2: { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated || + result == HeapTupleDeleted || result == HeapTupleBeingUpdated); Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID)); hufd->ctid = oldtup.t_data->t_ctid; @@ -3850,6 +3858,8 @@ l2: bms_free(id_attrs); bms_free(modified_attrs); bms_free(interesting_attrs); + if (result == HeapTupleUpdated && ItemPointerEquals(otid, &hufd->ctid)) + result = HeapTupleDeleted; return result; } @@ -4468,6 +4478,10 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) elog(ERROR, "tuple concurrently updated"); break; + case HeapTupleDeleted: + elog(ERROR, "tuple concurrently deleted"); + break; + default: elog(ERROR, "unrecognized heap_update status: %u", result); break; @@ -4520,6 +4534,7 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) * HeapTupleInvisible: lock failed because tuple was never visible to us * HeapTupleSelfUpdated: lock failed because tuple updated by self * HeapTupleUpdated: lock failed because tuple updated by other xact + * HeapTupleDeleted: lock failed because tuple deleted by other xact * HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip * * In the failure cases other than HeapTupleInvisible, the routine fills @@ -4532,12 +4547,13 @@ get_mxact_status_for_lock(LockTupleMode mode, bool is_update) * See README.tuplock for a thorough explanation of this mechanism. */ HTSU_Result -heap_lock_tuple(Relation relation, ItemPointer tid, StorageTuple * stuple, +heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, Buffer *buffer, HeapUpdateFailureData *hufd) { HTSU_Result result; + ItemPointer tid = &(tuple->t_self); ItemId lp; Page page; Buffer vmbuffer = InvalidBuffer; @@ -4550,9 +4566,6 @@ heap_lock_tuple(Relation relation, ItemPointer tid, StorageTuple * stuple, bool first_time = true; bool have_tuple_lock = false; bool cleared_all_frozen = false; - HeapTupleData tuple; - - Assert(stuple != NULL); *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); block = ItemPointerGetBlockNumber(tid); @@ -4572,13 +4585,12 @@ heap_lock_tuple(Relation relation, ItemPointer tid, StorageTuple * stuple, lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid)); Assert(ItemIdIsNormal(lp)); - tuple.t_data = (HeapTupleHeader) PageGetItem(page, lp); - tuple.t_len = ItemIdGetLength(lp); - tuple.t_tableOid = RelationGetRelid(relation); - ItemPointerCopy(tid, &tuple.t_self); + tuple->t_data = (HeapTupleHeader) PageGetItem(page, lp); + tuple->t_len = ItemIdGetLength(lp); + tuple->t_tableOid = RelationGetRelid(relation); l3: - result = HeapTupleSatisfiesUpdate(&tuple, cid, *buffer); + result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer); if (result == HeapTupleInvisible) { @@ -4591,7 +4603,7 @@ l3: result = HeapTupleInvisible; goto out_locked; } - else if (result == HeapTupleBeingUpdated || result == HeapTupleUpdated) + else if (result == HeapTupleBeingUpdated || result == HeapTupleUpdated || result == HeapTupleDeleted) { TransactionId xwait; uint16 infomask; @@ -4600,10 +4612,10 @@ l3: ItemPointerData t_ctid; /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetRawXmax(tuple.t_data); - infomask = tuple.t_data->t_infomask; - infomask2 = tuple.t_data->t_infomask2; - ItemPointerCopy(&tuple.t_data->t_ctid, &t_ctid); + xwait = HeapTupleHeaderGetRawXmax(tuple->t_data); + infomask = tuple->t_data->t_infomask; + infomask2 = tuple->t_data->t_infomask2; + ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid); LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); @@ -4735,7 +4747,7 @@ l3: { HTSU_Result res; - res = heap_lock_updated_tuple(relation, &tuple, &t_ctid, + res = heap_lock_updated_tuple(relation, tuple, &t_ctid, GetCurrentTransactionId(), mode); if (res != HeapTupleMayBeUpdated) @@ -4756,8 +4768,8 @@ l3: * now need to follow the update chain to lock the new * versions. */ - if (!HeapTupleHeaderIsOnlyLocked(tuple.t_data) && - ((tuple.t_data->t_infomask2 & HEAP_KEYS_UPDATED) || + if (!HeapTupleHeaderIsOnlyLocked(tuple->t_data) && + ((tuple->t_data->t_infomask2 & HEAP_KEYS_UPDATED) || !updated)) goto l3; @@ -4788,8 +4800,8 @@ l3: * Make sure it's still an appropriate lock, else start over. * See above about allowing xmax to change. */ - if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple.t_data->t_infomask) || - HEAP_XMAX_IS_EXCL_LOCKED(tuple.t_data->t_infomask)) + if (!HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) || + HEAP_XMAX_IS_EXCL_LOCKED(tuple->t_data->t_infomask)) goto l3; require_sleep = false; } @@ -4811,8 +4823,8 @@ l3: * meantime, start over. */ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - if (xmax_infomask_changed(tuple.t_data->t_infomask, infomask) || - !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple.t_data), + if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || + !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait)) goto l3; @@ -4825,9 +4837,9 @@ l3: LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); /* if the xmax changed in the meantime, start over */ - if (xmax_infomask_changed(tuple.t_data->t_infomask, infomask) || + if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || !TransactionIdEquals( - HeapTupleHeaderGetRawXmax(tuple.t_data), + HeapTupleHeaderGetRawXmax(tuple->t_data), xwait)) goto l3; /* otherwise, we're good */ @@ -4852,11 +4864,11 @@ l3: { /* ... but if the xmax changed in the meantime, start over */ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - if (xmax_infomask_changed(tuple.t_data->t_infomask, infomask) || - !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple.t_data), + if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || + !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait)) goto l3; - Assert(HEAP_XMAX_IS_LOCKED_ONLY(tuple.t_data->t_infomask)); + Assert(HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask)); require_sleep = false; } @@ -4871,7 +4883,7 @@ l3: * or we must wait for the locking transaction or multixact; so below * we ensure that we grab buffer lock after the sleep. */ - if (require_sleep && result == HeapTupleUpdated) + if (require_sleep && (result == HeapTupleUpdated || result == HeapTupleDeleted)) { LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; @@ -4913,7 +4925,7 @@ l3: { case LockWaitBlock: MultiXactIdWait((MultiXactId) xwait, status, infomask, - relation, &tuple.t_self, XLTW_Lock, NULL); + relation, &tuple->t_self, XLTW_Lock, NULL); break; case LockWaitSkip: if (!ConditionalMultiXactIdWait((MultiXactId) xwait, @@ -4954,7 +4966,7 @@ l3: switch (wait_policy) { case LockWaitBlock: - XactLockTableWait(xwait, relation, &tuple.t_self, + XactLockTableWait(xwait, relation, &tuple->t_self, XLTW_Lock); break; case LockWaitSkip: @@ -4981,7 +4993,7 @@ l3: { HTSU_Result res; - res = heap_lock_updated_tuple(relation, &tuple, &t_ctid, + res = heap_lock_updated_tuple(relation, tuple, &t_ctid, GetCurrentTransactionId(), mode); if (res != HeapTupleMayBeUpdated) @@ -5000,8 +5012,8 @@ l3: * other xact could update this tuple before we get to this point. * Check for xmax change, and start over if so. */ - if (xmax_infomask_changed(tuple.t_data->t_infomask, infomask) || - !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple.t_data), + if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || + !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait)) goto l3; @@ -5015,7 +5027,7 @@ l3: * don't check for this in the multixact case, because some * locker transactions might still be running. */ - UpdateXmaxHintBits(tuple.t_data, *buffer, xwait); + UpdateXmaxHintBits(tuple->t_data, *buffer, xwait); } } @@ -5027,10 +5039,12 @@ l3: * at all for whatever reason. */ if (!require_sleep || - (tuple.t_data->t_infomask & HEAP_XMAX_INVALID) || - HEAP_XMAX_IS_LOCKED_ONLY(tuple.t_data->t_infomask) || - HeapTupleHeaderIsOnlyLocked(tuple.t_data)) + (tuple->t_data->t_infomask & HEAP_XMAX_INVALID) || + HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_data->t_infomask) || + HeapTupleHeaderIsOnlyLocked(tuple->t_data)) result = HeapTupleMayBeUpdated; + else if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid)) + result = HeapTupleDeleted; else result = HeapTupleUpdated; } @@ -5039,12 +5053,12 @@ failed: if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated || - result == HeapTupleWouldBlock); - Assert(!(tuple.t_data->t_infomask & HEAP_XMAX_INVALID)); - hufd->ctid = tuple.t_data->t_ctid; - hufd->xmax = HeapTupleHeaderGetUpdateXid(tuple.t_data); + result == HeapTupleWouldBlock || result == HeapTupleDeleted); + Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID)); + hufd->ctid = tuple->t_data->t_ctid; + hufd->xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data); if (result == HeapTupleSelfUpdated) - hufd->cmax = HeapTupleHeaderGetCmax(tuple.t_data); + hufd->cmax = HeapTupleHeaderGetCmax(tuple->t_data); else hufd->cmax = InvalidCommandId; goto out_locked; @@ -5067,8 +5081,8 @@ failed: goto l3; } - xmax = HeapTupleHeaderGetRawXmax(tuple.t_data); - old_infomask = tuple.t_data->t_infomask; + xmax = HeapTupleHeaderGetRawXmax(tuple->t_data); + old_infomask = tuple->t_data->t_infomask; /* * If this is the first possibly-multixact-able operation in the current @@ -5085,7 +5099,7 @@ failed: * not modify the tuple just yet, because that would leave it in the wrong * state if multixact.c elogs. */ - compute_new_xmax_infomask(xmax, old_infomask, tuple.t_data->t_infomask2, + compute_new_xmax_infomask(xmax, old_infomask, tuple->t_data->t_infomask2, GetCurrentTransactionId(), mode, false, &xid, &new_infomask, &new_infomask2); @@ -5101,13 +5115,13 @@ failed: * Also reset the HOT UPDATE bit, but only if there's no update; otherwise * we would break the HOT chain. */ - tuple.t_data->t_infomask &= ~HEAP_XMAX_BITS; - tuple.t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; - tuple.t_data->t_infomask |= new_infomask; - tuple.t_data->t_infomask2 |= new_infomask2; + tuple->t_data->t_infomask &= ~HEAP_XMAX_BITS; + tuple->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED; + tuple->t_data->t_infomask |= new_infomask; + tuple->t_data->t_infomask2 |= new_infomask2; if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask)) - HeapTupleHeaderClearHotUpdated(tuple.t_data); - HeapTupleHeaderSetXmax(tuple.t_data, xid); + HeapTupleHeaderClearHotUpdated(tuple->t_data); + HeapTupleHeaderSetXmax(tuple->t_data, xid); /* * Make sure there is no forward chain link in t_ctid. Note that in the @@ -5117,7 +5131,7 @@ failed: * the tuple as well. */ if (HEAP_XMAX_IS_LOCKED_ONLY(new_infomask)) - tuple.t_data->t_ctid = *tid; + tuple->t_data->t_ctid = *tid; /* Clear only the all-frozen bit on visibility map if needed */ if (PageIsAllVisible(page) && @@ -5148,10 +5162,10 @@ failed: XLogBeginInsert(); XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD); - xlrec.offnum = ItemPointerGetOffsetNumber(&tuple.t_self); + xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); xlrec.locking_xid = xid; xlrec.infobits_set = compute_infobits(new_infomask, - tuple.t_data->t_infomask2); + tuple->t_data->t_infomask2); xlrec.flags = cleared_all_frozen ? XLH_LOCK_ALL_FROZEN_CLEARED : 0; XLogRegisterData((char *) &xlrec, SizeOfHeapLock); @@ -5185,7 +5199,6 @@ out_unlocked: if (have_tuple_lock) UnlockTupleTuplock(relation, tid, mode); - *stuple = heap_copytuple(&tuple); return result; } @@ -5919,6 +5932,10 @@ next: result = HeapTupleMayBeUpdated; out_locked: + + if (result == HeapTupleUpdated && ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid)) + result = HeapTupleDeleted; + UnlockReleaseBuffer(buf); if (vmbuffer != InvalidBuffer) diff --git a/src/backend/access/heap/heapam_storage.c b/src/backend/access/heap/heapam_storage.c index 8621015846..f8846519d6 100644 --- a/src/backend/access/heap/heapam_storage.c +++ b/src/backend/access/heap/heapam_storage.c @@ -25,8 +25,10 @@ #include "access/rewriteheap.h" #include "access/storageamapi.h" #include "pgstat.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/rel.h" +#include "utils/tqual.h" /* ---------------------------------------------------------------- @@ -286,6 +288,219 @@ heapam_fetch_tuple_from_offset(StorageScanDesc sscan, BlockNumber blkno, OffsetN return &(scan->rs_ctup); } +/* + * Locks tuple and fetches its newest version and TID. + * + * relation - table containing tuple + * *tid - TID of tuple to lock (rest of struct need not be valid) + * snapshot - snapshot indentifying required version (used for assert check only) + * *stuple - tuple to be returned + * cid - current command ID (used for visibility test, and stored into + * tuple's cmax if lock is successful) + * mode - indicates if shared or exclusive tuple lock is desired + * wait_policy - what to do if tuple lock is not available + * flags – indicating how do we handle updated tuples + * *hufd - filled in failure cases + * + * Function result may be: + * HeapTupleMayBeUpdated: lock was successfully acquired + * HeapTupleInvisible: lock failed because tuple was never visible to us + * HeapTupleSelfUpdated: lock failed because tuple updated by self + * HeapTupleUpdated: lock failed because tuple updated by other xact + * HeapTupleDeleted: lock failed because tuple deleted by other xact + * HeapTupleWouldBlock: lock couldn't be acquired and wait_policy is skip + * + * In the failure cases other than HeapTupleInvisible, the routine fills + * *hufd with the tuple's t_ctid, t_xmax (resolving a possible MultiXact, + * if necessary), and t_cmax (the last only for HeapTupleSelfUpdated, + * since we cannot obtain cmax from a combocid generated by another + * transaction). + * See comments for struct HeapUpdateFailureData for additional info. + */ +static HTSU_Result +heapam_lock_tuple(Relation relation, ItemPointer tid, Snapshot snapshot, + StorageTuple *stuple, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + HeapUpdateFailureData *hufd) +{ + HTSU_Result result; + HeapTupleData tuple; + Buffer buffer; + + Assert(stuple != NULL); + *stuple = NULL; + + hufd->traversed = false; + +retry: + tuple.t_self = *tid; + result = heap_lock_tuple(relation, &tuple, cid, mode, wait_policy, + (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) ? true : false, + &buffer, hufd); + + if (result == HeapTupleUpdated && + (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) + { + ReleaseBuffer(buffer); + /* Should not encounter speculative tuple on recheck */ + Assert(!HeapTupleHeaderIsSpeculative(tuple.t_data)); + + if (!ItemPointerEquals(&hufd->ctid, &tuple.t_self)) + { + SnapshotData SnapshotDirty; + TransactionId priorXmax; + + /* it was updated, so look at the updated version */ + *tid = hufd->ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = hufd->xmax; + + /* + * fetch target tuple + * + * Loop here to deal with updated or busy tuples + */ + InitDirtySnapshot(SnapshotDirty); + for (;;) + { + if (heap_fetch(relation, tid, &SnapshotDirty, &tuple, &buffer, true, NULL)) + { + /* + * If xmin isn't what we're expecting, the slot must have been + * recycled and reused for an unrelated tuple. This implies that + * the latest version of the row was deleted, so we need do + * nothing. (Should be safe to examine xmin without getting + * buffer's content lock. We assume reading a TransactionId to be + * atomic, and Xmin never changes in an existing tuple, except to + * invalid or frozen, and neither of those can match priorXmax.) + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return HeapTupleDeleted; + } + + /* otherwise xmin should not be dirty... */ + if (TransactionIdIsValid(SnapshotDirty.xmin)) + elog(ERROR, "t_xmin is uncommitted in tuple to be updated"); + + /* + * If tuple is being updated by other transaction then we have to + * wait for its commit/abort, or die trying. + */ + if (TransactionIdIsValid(SnapshotDirty.xmax)) + { + ReleaseBuffer(buffer); + switch (wait_policy) + { + case LockWaitBlock: + XactLockTableWait(SnapshotDirty.xmax, + relation, &tuple.t_self, + XLTW_FetchUpdated); + break; + case LockWaitSkip: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + return result; /* skip instead of waiting */ + break; + case LockWaitError: + if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain lock on row in relation \"%s\"", + RelationGetRelationName(relation)))); + break; + } + continue; /* loop back to repeat heap_fetch */ + } + + /* + * If tuple was inserted by our own transaction, we have to check + * cmin against es_output_cid: cmin >= current CID means our + * command cannot see the tuple, so we should ignore it. Otherwise + * heap_lock_tuple() will throw an error, and so would any later + * attempt to update or delete the tuple. (We need not check cmax + * because HeapTupleSatisfiesDirty will consider a tuple deleted + * by our transaction dead, regardless of cmax.) We just checked + * that priorXmax == xmin, so we can test that variable instead of + * doing HeapTupleHeaderGetXmin again. + */ + if (TransactionIdIsCurrentTransactionId(priorXmax) && + HeapTupleHeaderGetCmin(tuple.t_data) >= cid) + { + ReleaseBuffer(buffer); + return result; + } + + hufd->traversed = true; + *tid = tuple.t_data->t_ctid; + ReleaseBuffer(buffer); + goto retry; + } + + /* + * If the referenced slot was actually empty, the latest version of + * the row must have been deleted, so we need do nothing. + */ + if (tuple.t_data == NULL) + { + ReleaseBuffer(buffer); + return HeapTupleDeleted; + } + + /* + * As above, if xmin isn't what we're expecting, do nothing. + */ + if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple.t_data), + priorXmax)) + { + ReleaseBuffer(buffer); + return HeapTupleDeleted; + } + + /* + * If we get here, the tuple was found but failed SnapshotDirty. + * Assuming the xmin is either a committed xact or our own xact (as it + * certainly should be if we're trying to modify the tuple), this must + * mean that the row was updated or deleted by either a committed xact + * or our own xact. If it was deleted, we can ignore it; if it was + * updated then chain up to the next version and repeat the whole + * process. + * + * As above, it should be safe to examine xmax and t_ctid without the + * buffer content lock, because they can't be changing. + */ + if (ItemPointerEquals(&tuple.t_self, &tuple.t_data->t_ctid)) + { + /* deleted, so forget about it */ + ReleaseBuffer(buffer); + return HeapTupleDeleted; + } + + /* updated, so look at the updated row */ + *tid = tuple.t_data->t_ctid; + /* updated row should have xmin matching this xmax */ + priorXmax = HeapTupleHeaderGetUpdateXid(tuple.t_data); + ReleaseBuffer(buffer); + /* loop back to fetch next in chain */ + } + } + else + { + /* tuple was deleted, so give up */ + return HeapTupleDeleted; + } + } + + Assert((flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION) || + HeapTupleSatisfies((StorageTuple) &tuple, snapshot, InvalidBuffer)); + + *stuple = heap_copytuple(&tuple); + ReleaseBuffer(buffer); + + return result; +} + Datum heapam_storage_handler(PG_FUNCTION_ARGS) @@ -325,7 +540,7 @@ heapam_storage_handler(PG_FUNCTION_ARGS) amroutine->tuple_insert = heapam_heap_insert; amroutine->tuple_delete = heapam_heap_delete; amroutine->tuple_update = heapam_heap_update; - amroutine->tuple_lock = heap_lock_tuple; + amroutine->tuple_lock = heapam_lock_tuple; amroutine->multi_insert = heap_multi_insert; amroutine->get_tuple_data = heapam_get_tuple_data; diff --git a/src/backend/access/heap/heapam_visibility.c b/src/backend/access/heap/heapam_visibility.c index daeb9bdb29..20f7d908e7 100644 --- a/src/backend/access/heap/heapam_visibility.c +++ b/src/backend/access/heap/heapam_visibility.c @@ -115,6 +115,9 @@ static inline void SetHintBits(HeapTupleHeader tuple, Buffer buffer, uint16 infomask, TransactionId xid) { + if (!BufferIsValid(buffer)) + return; + if (TransactionIdIsValid(xid)) { /* NB: xid must be known committed here! */ @@ -613,7 +616,11 @@ HeapTupleSatisfiesUpdate(StorageTuple stup, CommandId curcid, { if (HEAP_XMAX_IS_LOCKED_ONLY(tuple->t_infomask)) return HeapTupleMayBeUpdated; - return HeapTupleUpdated; /* updated by other */ + /* updated by other */ + if (ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) + return HeapTupleDeleted; + else + return HeapTupleUpdated; } if (tuple->t_infomask & HEAP_XMAX_IS_MULTI) @@ -654,7 +661,12 @@ HeapTupleSatisfiesUpdate(StorageTuple stup, CommandId curcid, return HeapTupleBeingUpdated; if (TransactionIdDidCommit(xmax)) - return HeapTupleUpdated; + { + if (ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) + return HeapTupleDeleted; + else + return HeapTupleUpdated; + } /* * By here, the update in the Xmax is either aborted or crashed, but @@ -710,7 +722,12 @@ HeapTupleSatisfiesUpdate(StorageTuple stup, CommandId curcid, SetHintBits(tuple, buffer, HEAP_XMAX_COMMITTED, HeapTupleHeaderGetRawXmax(tuple)); - return HeapTupleUpdated; /* updated by other */ + + /* updated by other */ + if (ItemPointerEquals(&htup->t_self, &tuple->t_ctid)) + return HeapTupleDeleted; + else + return HeapTupleUpdated; } /* diff --git a/src/backend/access/storage/storageam.c b/src/backend/access/storage/storageam.c index 239339c033..39aa55f80c 100644 --- a/src/backend/access/storage/storageam.c +++ b/src/backend/access/storage/storageam.c @@ -41,13 +41,14 @@ storage_fetch(Relation relation, * storage_lock_tuple - lock a tuple in shared or exclusive mode */ HTSU_Result -storage_lock_tuple(Relation relation, ItemPointer tid, StorageTuple * stuple, - CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, - bool follow_updates, Buffer *buffer, HeapUpdateFailureData *hufd) +storage_lock_tuple(Relation relation, ItemPointer tid, Snapshot snapshot, + StorageTuple *stuple, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + HeapUpdateFailureData *hufd) { - return relation->rd_stamroutine->tuple_lock(relation, tid, stuple, + return relation->rd_stamroutine->tuple_lock(relation, tid, snapshot, stuple, cid, mode, wait_policy, - follow_updates, buffer, hufd); + flags, hufd); } /* ---------------- diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c index ca413df263..80639a1ed7 100644 --- a/src/backend/commands/trigger.c +++ b/src/backend/commands/trigger.c @@ -3019,8 +3019,6 @@ GetTupleForTrigger(EState *estate, Relation relation = relinfo->ri_RelationDesc; StorageTuple tuple; HeapTuple result; - Buffer buffer; - tuple_data t_data; if (newSlot != NULL) { @@ -3035,11 +3033,11 @@ GetTupleForTrigger(EState *estate, /* * lock tuple for update */ -ltrmark:; - test = storage_lock_tuple(relation, tid, &tuple, + test = storage_lock_tuple(relation, tid, estate->es_snapshot, &tuple, estate->es_output_cid, lockmode, LockWaitBlock, - false, &buffer, &hufd); + IsolationUsesXactSnapshot() ? 0 : TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &hufd); result = tuple; switch (test) { @@ -3060,63 +3058,54 @@ ltrmark:; errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); /* treat it as deleted; do not process */ - ReleaseBuffer(buffer); return NULL; case HeapTupleMayBeUpdated: - break; - - case HeapTupleUpdated: - ReleaseBuffer(buffer); - if (IsolationUsesXactSnapshot()) - ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("could not serialize access due to concurrent update"))); - t_data = relation->rd_stamroutine->get_tuple_data(tuple, TID); - if (!ItemPointerEquals(&hufd.ctid, &(t_data.tid))) + if (hufd.traversed) { - /* it was updated, so look at the updated version */ TupleTableSlot *epqslot; epqslot = EvalPlanQual(estate, epqstate, relation, relinfo->ri_RangeTableIndex, - lockmode, - &hufd.ctid, - hufd.xmax); - if (!TupIsNull(epqslot)) - { - *tid = hufd.ctid; - *newSlot = epqslot; - - /* - * EvalPlanQual already locked the tuple, but we - * re-call heap_lock_tuple anyway as an easy way of - * re-fetching the correct tuple. Speed is hardly a - * criterion in this path anyhow. - */ - goto ltrmark; - } + tuple); + + /* If PlanQual failed for updated tuple - we must not process this tuple!*/ + if (TupIsNull(epqslot)) + return NULL; + + *newSlot = epqslot; } + break; - /* - * if tuple was deleted or PlanQual failed for updated tuple - - * we must not process this tuple! - */ + case HeapTupleUpdated: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent update"))); + elog(ERROR, "wrong heap_lock_tuple status: %u", test); + break; + + case HeapTupleDeleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent update"))); + /* tuple was deleted */ return NULL; case HeapTupleInvisible: elog(ERROR, "attempted to lock invisible tuple"); default: - ReleaseBuffer(buffer); elog(ERROR, "unrecognized heap_lock_tuple status: %u", test); return NULL; /* keep compiler quiet */ } } else { + Buffer buffer; Page page; ItemId lp; HeapTupleData tupledata; @@ -3146,9 +3135,9 @@ ltrmark:; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); result = heap_copytuple(&tupledata); + ReleaseBuffer(buffer); } - ReleaseBuffer(buffer); return result; } diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index db9196924b..82f1d7d95b 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -2462,9 +2462,7 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist) * epqstate - state for EvalPlanQual rechecking * relation - table containing tuple * rti - rangetable index of table containing tuple - * lockmode - requested tuple lock mode - * *tid - t_ctid from the outdated tuple (ie, next updated version) - * priorXmax - t_xmax from the outdated tuple + * tuple - tuple for processing * * *tid is also an output parameter: it's modified to hold the TID of the * latest version of the tuple (note this may be changed even on failure) @@ -2477,32 +2475,12 @@ ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist) */ TupleTableSlot * EvalPlanQual(EState *estate, EPQState *epqstate, - Relation relation, Index rti, int lockmode, - ItemPointer tid, TransactionId priorXmax) + Relation relation, Index rti, StorageTuple tuple) { TupleTableSlot *slot; - StorageTuple copyTuple; - tuple_data t_data; Assert(rti > 0); - /* - * Get and lock the updated version of the row; if fail, return NULL. - */ - copyTuple = EvalPlanQualFetch(estate, relation, lockmode, LockWaitBlock, - tid, priorXmax); - - if (copyTuple == NULL) - return NULL; - - /* - * For UPDATE/DELETE we have to return tid of actual row we're executing - * PQ for. - */ - - t_data = storage_tuple_get_data(relation, copyTuple, TID); - *tid = t_data.tid; - /* * Need to run a recheck subquery. Initialize or reinitialize EPQ state. */ @@ -2512,7 +2490,7 @@ EvalPlanQual(EState *estate, EPQState *epqstate, * Free old test tuple, if any, and store new tuple where relation's scan * node will see it */ - EvalPlanQualSetTuple(epqstate, rti, copyTuple); + EvalPlanQualSetTuple(epqstate, rti, tuple); /* * Fetch any non-locked source rows @@ -2544,256 +2522,6 @@ EvalPlanQual(EState *estate, EPQState *epqstate, return slot; } -/* - * Fetch a copy of the newest version of an outdated tuple - * - * estate - executor state data - * relation - table containing tuple - * lockmode - requested tuple lock mode - * wait_policy - requested lock wait policy - * *tid - t_ctid from the outdated tuple (ie, next updated version) - * priorXmax - t_xmax from the outdated tuple - * - * Returns a palloc'd copy of the newest tuple version, or NULL if we find - * that there is no newest version (ie, the row was deleted not updated). - * We also return NULL if the tuple is locked and the wait policy is to skip - * such tuples. - * - * If successful, we have locked the newest tuple version, so caller does not - * need to worry about it changing anymore. - * - * Note: properly, lockmode should be declared as enum LockTupleMode, - * but we use "int" to avoid having to include heapam.h in executor.h. - */ -StorageTuple -EvalPlanQualFetch(EState *estate, Relation relation, int lockmode, - LockWaitPolicy wait_policy, - ItemPointer tid, TransactionId priorXmax) -{ - StorageTuple tuple = NULL; - SnapshotData SnapshotDirty; - tuple_data t_data; - - /* - * fetch target tuple - * - * Loop here to deal with updated or busy tuples - */ - InitDirtySnapshot(SnapshotDirty); - for (;;) - { - Buffer buffer; - ItemPointerData ctid; - - if (storage_fetch(relation, tid, &SnapshotDirty, &tuple, &buffer, true, NULL)) - { - HTSU_Result test; - HeapUpdateFailureData hufd; - - /* - * If xmin isn't what we're expecting, the slot must have been - * recycled and reused for an unrelated tuple. This implies that - * the latest version of the row was deleted, so we need do - * nothing. (Should be safe to examine xmin without getting - * buffer's content lock. We assume reading a TransactionId to be - * atomic, and Xmin never changes in an existing tuple, except to - * invalid or frozen, and neither of those can match priorXmax.) - */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmin(((HeapTuple) tuple)->t_data), - priorXmax)) - { - ReleaseBuffer(buffer); - return NULL; - } - - /* otherwise xmin should not be dirty... */ - if (TransactionIdIsValid(SnapshotDirty.xmin)) - elog(ERROR, "t_xmin is uncommitted in tuple to be updated"); - - /* - * If tuple is being updated by other transaction then we have to - * wait for its commit/abort, or die trying. - */ - if (TransactionIdIsValid(SnapshotDirty.xmax)) - { - ReleaseBuffer(buffer); - switch (wait_policy) - { - case LockWaitBlock: - XactLockTableWait(SnapshotDirty.xmax, - relation, - tid, - XLTW_FetchUpdated); - break; - case LockWaitSkip: - if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) - return NULL; /* skip instead of waiting */ - break; - case LockWaitError: - if (!ConditionalXactLockTableWait(SnapshotDirty.xmax)) - ereport(ERROR, - (errcode(ERRCODE_LOCK_NOT_AVAILABLE), - errmsg("could not obtain lock on row in relation \"%s\"", - RelationGetRelationName(relation)))); - break; - } - continue; /* loop back to repeat heap_fetch */ - } - - /* - * If tuple was inserted by our own transaction, we have to check - * cmin against es_output_cid: cmin >= current CID means our - * command cannot see the tuple, so we should ignore it. Otherwise - * heap_lock_tuple() will throw an error, and so would any later - * attempt to update or delete the tuple. (We need not check cmax - * because HeapTupleSatisfiesDirty will consider a tuple deleted - * by our transaction dead, regardless of cmax.) We just checked - * that priorXmax == xmin, so we can test that variable instead of - * doing HeapTupleHeaderGetXmin again. - */ - if (TransactionIdIsCurrentTransactionId(priorXmax)) - { - t_data = storage_tuple_get_data(relation, tuple, CMIN); - if (t_data.cid >= estate->es_output_cid) - { - ReleaseBuffer(buffer); - return NULL; - } - } - - /* - * This is a live tuple, so now try to lock it. - */ - test = storage_lock_tuple(relation, tid, &tuple, - estate->es_output_cid, - lockmode, wait_policy, - false, &buffer, &hufd); - /* We now have two pins on the buffer, get rid of one */ - ReleaseBuffer(buffer); - - switch (test) - { - case HeapTupleSelfUpdated: - - /* - * The target tuple was already updated or deleted by the - * current command, or by a later command in the current - * transaction. We *must* ignore the tuple in the former - * case, so as to avoid the "Halloween problem" of - * repeated update attempts. In the latter case it might - * be sensible to fetch the updated tuple instead, but - * doing so would require changing heap_update and - * heap_delete to not complain about updating "invisible" - * tuples, which seems pretty scary (heap_lock_tuple will - * not complain, but few callers expect - * HeapTupleInvisible, and we're not one of them). So for - * now, treat the tuple as deleted and do not process. - */ - ReleaseBuffer(buffer); - return NULL; - - case HeapTupleMayBeUpdated: - /* successfully locked */ - break; - - case HeapTupleUpdated: - ReleaseBuffer(buffer); - if (IsolationUsesXactSnapshot()) - ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("could not serialize access due to concurrent update"))); - -#if 0 //hari - /* Should not encounter speculative tuple on recheck */ - Assert(!HeapTupleHeaderIsSpeculative(tuple.t_data)); -#endif - t_data = storage_tuple_get_data(relation, tuple, TID); - if (!ItemPointerEquals(&hufd.ctid, &t_data.tid)) - { - /* it was updated, so look at the updated version */ - *tid = hufd.ctid; - /* updated row should have xmin matching this xmax */ - priorXmax = hufd.xmax; - continue; - } - /* tuple was deleted, so give up */ - return NULL; - - case HeapTupleWouldBlock: - ReleaseBuffer(buffer); - return NULL; - - case HeapTupleInvisible: - elog(ERROR, "attempted to lock invisible tuple"); - - default: - ReleaseBuffer(buffer); - elog(ERROR, "unrecognized heap_lock_tuple status: %u", - test); - return NULL; /* keep compiler quiet */ - } - - ReleaseBuffer(buffer); - break; - } - - /* - * If the referenced slot was actually empty, the latest version of - * the row must have been deleted, so we need do nothing. - */ - if (tuple == NULL) - { - ReleaseBuffer(buffer); - return NULL; - } - - /* - * As above, if xmin isn't what we're expecting, do nothing. - */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmin(((HeapTuple) tuple)->t_data), - priorXmax)) - { - ReleaseBuffer(buffer); - return NULL; - } - - /* - * If we get here, the tuple was found but failed SnapshotDirty. - * Assuming the xmin is either a committed xact or our own xact (as it - * certainly should be if we're trying to modify the tuple), this must - * mean that the row was updated or deleted by either a committed xact - * or our own xact. If it was deleted, we can ignore it; if it was - * updated then chain up to the next version and repeat the whole - * process. - * - * As above, it should be safe to examine xmax and t_ctid without the - * buffer content lock, because they can't be changing. - */ - t_data = storage_tuple_get_data(relation, tuple, CTID); - ctid = t_data.tid; - if (ItemPointerEquals(tid, &ctid)) - { - /* deleted, so forget about it */ - ReleaseBuffer(buffer); - return NULL; - } - - /* updated, so look at the updated row */ - *tid = ctid; - - /* updated row should have xmin matching this xmax */ - t_data = storage_tuple_get_data(relation, tuple, UPDATED_XID); - priorXmax = t_data.xid; - ReleaseBuffer(buffer); - /* loop back to fetch next in chain */ - } - - /* - * Return the tuple - */ - return tuple; -} - /* * EvalPlanQualInit -- initialize during creation of a plan state node * that might need to invoke EPQ processing. diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 8a658109d8..9a5e4703be 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -167,21 +167,19 @@ retry: /* Found tuple, try to lock it in the lockmode. */ if (found) { - Buffer buf; HeapUpdateFailureData hufd; HTSU_Result res; StorageTuple locktup; PushActiveSnapshot(GetLatestSnapshot()); - res = storage_lock_tuple(rel, &(outslot->tts_tid), &locktup, GetCurrentCommandId(false), + res = storage_lock_tuple(rel, &(outslot->tts_tid), GetLatestSnapshot(), + &locktup, + GetCurrentCommandId(false), lockmode, LockWaitBlock, - false /* don't follow updates */ , - &buf, &hufd); - /* the tuple slot already has the buffer pinned */ - if (BufferIsValid(buf)) - ReleaseBuffer(buf); + 0 /* don't follow updates */ , + &hufd); pfree(locktup); PopActiveSnapshot(); @@ -196,6 +194,12 @@ retry: (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; + case HeapTupleDeleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; case HeapTupleInvisible: elog(ERROR, "attempted to lock invisible tuple"); default: @@ -274,21 +278,19 @@ retry: /* Found tuple, try to lock it in the lockmode. */ if (found) { - Buffer buf; HeapUpdateFailureData hufd; HTSU_Result res; StorageTuple locktup; PushActiveSnapshot(GetLatestSnapshot()); - res = storage_lock_tuple(rel, &(outslot->tts_tid), &locktup, GetCurrentCommandId(false), + res = storage_lock_tuple(rel, &(outslot->tts_tid), GetLatestSnapshot(), + &locktup, + GetCurrentCommandId(false), lockmode, LockWaitBlock, - false /* don't follow updates */ , - &buf, &hufd); - /* the tuple slot already has the buffer pinned */ - if (BufferIsValid(buf)) - ReleaseBuffer(buf); + 0 /* don't follow updates */ , + &hufd); pfree(locktup); @@ -304,6 +306,12 @@ retry: (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("concurrent update, retrying"))); goto retry; + case HeapTupleDeleted: + /* XXX: Improve handling here */ + ereport(LOG, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("concurrent delete, retrying"))); + goto retry; case HeapTupleInvisible: elog(ERROR, "attempted to lock invisible tuple"); default: diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c index c8327ee1ce..f325500743 100644 --- a/src/backend/executor/nodeLockRows.c +++ b/src/backend/executor/nodeLockRows.c @@ -79,13 +79,11 @@ lnext: Datum datum; bool isNull; StorageTuple tuple; - Buffer buffer; HeapUpdateFailureData hufd; LockTupleMode lockmode; HTSU_Result test; StorageTuple copyTuple; ItemPointerData tid; - tuple_data t_data; /* clear any leftover test tuple for this rel */ testTuple = (StorageTuple) (&(node->lr_curtuples[erm->rti - 1])); @@ -183,12 +181,12 @@ lnext: break; } - test = storage_lock_tuple(erm->relation, &tid, &tuple, - estate->es_output_cid, - lockmode, erm->waitPolicy, true, - &buffer, &hufd); - if (BufferIsValid(buffer)) - ReleaseBuffer(buffer); + test = storage_lock_tuple(erm->relation, &tid, estate->es_snapshot, + &tuple, estate->es_output_cid, + lockmode, erm->waitPolicy, + (IsolationUsesXactSnapshot() ? 0 : TUPLE_LOCK_FLAG_FIND_LAST_VERSION) + | TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS, + &hufd); switch (test) { @@ -216,6 +214,16 @@ lnext: case HeapTupleMayBeUpdated: /* got the lock successfully */ + if (hufd.traversed) + { + /* Save locked tuple for EvalPlanQual testing below */ + *testTuple = tuple; + + /* Remember we need to do EPQ testing */ + epq_needed = true; + + /* Continue loop until we have all target tuples */ + } break; case HeapTupleUpdated: @@ -223,38 +231,19 @@ lnext: ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); - t_data = erm->relation->rd_stamroutine->get_tuple_data(tuple, TID); - if (ItemPointerEquals(&hufd.ctid, &(t_data.tid))) - { - /* Tuple was deleted, so don't return it */ - goto lnext; - } - - /* updated, so fetch and lock the updated version */ - copyTuple = EvalPlanQualFetch(estate, erm->relation, - lockmode, erm->waitPolicy, - &hufd.ctid, hufd.xmax); - - if (copyTuple == NULL) - { - /* - * Tuple was deleted; or it's locked and we're under SKIP - * LOCKED policy, so don't return it - */ - goto lnext; - } - /* remember the actually locked tuple's TID */ - t_data = erm->relation->rd_stamroutine->get_tuple_data(copyTuple, TID); - tid = t_data.tid; - - /* Save locked tuple for EvalPlanQual testing below */ - *testTuple = copyTuple; - - /* Remember we need to do EPQ testing */ - epq_needed = true; + /* skip lock */ + goto lnext; - /* Continue loop until we have all target tuples */ - break; + case HeapTupleDeleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent update"))); + /* + * Tuple was deleted; or it's locked and we're under SKIP + * LOCKED policy, so don't return it + */ + goto lnext; case HeapTupleInvisible: elog(ERROR, "attempted to lock invisible tuple"); diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index c8cc1e0f12..5352cf903a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -203,7 +203,8 @@ ExecCheckHeapTupleVisible(EState *estate, * We need buffer pin and lock to call HeapTupleSatisfiesVisibility. * Caller should be holding pin, but not lock. */ - LockBuffer(buffer, BUFFER_LOCK_SHARE); + if (BufferIsValid(buffer)) + LockBuffer(buffer, BUFFER_LOCK_SHARE); if (!HeapTupleSatisfiesVisibility(rel->rd_stamroutine, tuple, estate->es_snapshot, buffer)) { tuple_data t_data = storage_tuple_get_data(rel, tuple, XMIN); @@ -219,7 +220,8 @@ ExecCheckHeapTupleVisible(EState *estate, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); } - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + if (BufferIsValid(buffer)) + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); } /* @@ -678,6 +680,7 @@ ExecDelete(ModifyTableState *mtstate, HTSU_Result result; HeapUpdateFailureData hufd; TupleTableSlot *slot = NULL; + StorageTuple tuple; /* * get information on the (current) result relation @@ -760,6 +763,35 @@ ldelete:; true /* wait for commit */ , NULL, &hufd); + + if (result == HeapTupleUpdated && !IsolationUsesXactSnapshot()) + { + result = storage_lock_tuple(resultRelationDesc, tupleid, + estate->es_snapshot, + &tuple, estate->es_output_cid, + LockTupleExclusive, LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &hufd); + + Assert(result != HeapTupleUpdated && hufd.traversed); + if (result == HeapTupleMayBeUpdated) + { + TupleTableSlot *epqslot; + + epqslot = EvalPlanQual(estate, + epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + tuple); + if (TupIsNull(epqslot)) + { + /* Tuple no more passing quals, exiting... */ + return NULL; + } + goto ldelete; + } + } + switch (result) { case HeapTupleSelfUpdated: @@ -805,23 +837,16 @@ ldelete:; ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); - if (!ItemPointerEquals(tupleid, &hufd.ctid)) - { - TupleTableSlot *epqslot; - - epqslot = EvalPlanQual(estate, - epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - LockTupleExclusive, - &hufd.ctid, - hufd.xmax); - if (!TupIsNull(epqslot)) - { - *tupleid = hufd.ctid; - goto ldelete; - } - } + else + /* shouldn't get there */ + elog(ERROR, "wrong heap_delete status: %u", result); + break; + + case HeapTupleDeleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); /* tuple already deleted; nothing to do */ return NULL; @@ -1060,6 +1085,37 @@ lreplace:; &hufd, &lockmode, ExecInsertIndexTuples, &recheckIndexes); + + if (result == HeapTupleUpdated && !IsolationUsesXactSnapshot()) + { + result = storage_lock_tuple(resultRelationDesc, tupleid, + estate->es_snapshot, + &tuple, estate->es_output_cid, + lockmode, LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + &hufd); + + Assert(result != HeapTupleUpdated && hufd.traversed); + if (result == HeapTupleMayBeUpdated) + { + TupleTableSlot *epqslot; + + epqslot = EvalPlanQual(estate, + epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + tuple); + if (TupIsNull(epqslot)) + { + /* Tuple no more passing quals, exiting... */ + return NULL; + } + slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot); + tuple = ExecHeapifySlot(slot); + goto lreplace; + } + } + switch (result) { case HeapTupleSelfUpdated: @@ -1104,25 +1160,16 @@ lreplace:; ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); - if (!ItemPointerEquals(tupleid, &hufd.ctid)) - { - TupleTableSlot *epqslot; - - epqslot = EvalPlanQual(estate, - epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - lockmode, - &hufd.ctid, - hufd.xmax); - if (!TupIsNull(epqslot)) - { - *tupleid = hufd.ctid; - slot = ExecFilterJunk(resultRelInfo->ri_junkFilter, epqslot); - tuple = ExecHeapifySlot(slot); - goto lreplace; - } - } + else + /* shouldn't get there */ + elog(ERROR, "wrong heap_delete status: %u", result); + break; + + case HeapTupleDeleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); /* tuple already deleted; nothing to do */ return NULL; @@ -1191,8 +1238,8 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, HeapUpdateFailureData hufd; LockTupleMode lockmode; HTSU_Result test; - Buffer buffer; tuple_data t_data; + SnapshotData snapshot; /* Determine lock mode to use */ lockmode = ExecUpdateLockMode(estate, resultRelInfo); @@ -1203,8 +1250,12 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, * previous conclusion that the tuple is conclusively committed is not * true anymore. */ - test = storage_lock_tuple(relation, conflictTid, &tuple, estate->es_output_cid, - lockmode, LockWaitBlock, false, &buffer, &hufd); + InitDirtySnapshot(snapshot); + test = storage_lock_tuple(relation, conflictTid, + &snapshot, + /*estate->es_snapshot,*/ + &tuple, estate->es_output_cid, + lockmode, LockWaitBlock, 0, &hufd); switch (test) { case HeapTupleMayBeUpdated: @@ -1261,8 +1312,15 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, * loop here, as the new version of the row might not conflict * anymore, or the conflicting tuple has actually been deleted. */ - if (BufferIsValid(buffer)) - ReleaseBuffer(buffer); + pfree(tuple); + return false; + + case HeapTupleDeleted: + if (IsolationUsesXactSnapshot()) + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent delete"))); + pfree(tuple); return false; @@ -1291,10 +1349,10 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, * snapshot. This is in line with the way UPDATE deals with newer tuple * versions. */ - ExecCheckHeapTupleVisible(estate, relation, tuple, buffer); + ExecCheckHeapTupleVisible(estate, relation, tuple, InvalidBuffer); /* Store target's existing tuple in the state's dedicated slot */ - ExecStoreTuple(tuple, mtstate->mt_existing, buffer, false); + ExecStoreTuple(tuple, mtstate->mt_existing, InvalidBuffer, false); /* * Make tuple and any needed join variables available to ExecQual and @@ -1309,8 +1367,6 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, if (!ExecQual(onConflictSetWhere, econtext)) { - if (BufferIsValid(buffer)) - ReleaseBuffer(buffer); pfree(tuple); InstrCountFiltered1(&mtstate->ps, 1); return true; /* done with the tuple */ @@ -1356,8 +1412,6 @@ ExecOnConflictUpdate(ModifyTableState *mtstate, &mtstate->mt_epqstate, mtstate->ps.state, canSetTag); - if (BufferIsValid(buffer)) - ReleaseBuffer(buffer); pfree(tuple); return true; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 58a955be4f..27399cf817 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -69,6 +69,7 @@ typedef struct HeapUpdateFailureData ItemPointerData ctid; TransactionId xmax; CommandId cmax; + bool traversed; } HeapUpdateFailureData; @@ -162,10 +163,10 @@ extern HTSU_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait, HeapUpdateFailureData *hufd, LockTupleMode *lockmode); -extern HTSU_Result heap_lock_tuple(Relation relation, ItemPointer tid, StorageTuple * tuple, - CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, - bool follow_update, - Buffer *buffer, HeapUpdateFailureData *hufd); +extern HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, + CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, + bool follow_updates, + Buffer *buffer, HeapUpdateFailureData *hufd); extern void heap_inplace_update(Relation relation, HeapTuple tuple); extern bool heap_freeze_tuple(HeapTupleHeader tuple, diff --git a/src/include/access/storageam.h b/src/include/access/storageam.h index c7cd01cd10..3e151a70ca 100644 --- a/src/include/access/storageam.h +++ b/src/include/access/storageam.h @@ -87,10 +87,10 @@ extern bool storage_hot_search_buffer(ItemPointer tid, Relation relation, Buffer extern bool storage_hot_search(ItemPointer tid, Relation relation, Snapshot snapshot, bool *all_dead); -extern HTSU_Result storage_lock_tuple(Relation relation, ItemPointer tid, StorageTuple * stuple, - CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, - bool follow_updates, - Buffer *buffer, HeapUpdateFailureData *hufd); +extern HTSU_Result storage_lock_tuple(Relation relation, ItemPointer tid, Snapshot snapshot, + StorageTuple *stuple, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + HeapUpdateFailureData *hufd); extern Oid storage_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate, InsertIndexTuples IndexFunc, diff --git a/src/include/access/storageamapi.h b/src/include/access/storageamapi.h index b7a0d5fb07..4c39de6f31 100644 --- a/src/include/access/storageamapi.h +++ b/src/include/access/storageamapi.h @@ -61,12 +61,12 @@ typedef bool (*TupleFetch_function) (Relation relation, typedef HTSU_Result (*TupleLock_function) (Relation relation, ItemPointer tid, - StorageTuple * tuple, + Snapshot snapshot, + StorageTuple *tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, - bool follow_update, - Buffer *buffer, + uint8 flags, HeapUpdateFailureData *hufd); typedef void (*MultiInsert_function) (Relation relation, HeapTuple *tuples, int ntuples, diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 612d468f1f..b4cb83dd25 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -196,11 +196,7 @@ extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo); extern ExecRowMark *ExecFindRowMark(EState *estate, Index rti, bool missing_ok); extern ExecAuxRowMark *ExecBuildAuxRowMark(ExecRowMark *erm, List *targetlist); extern TupleTableSlot *EvalPlanQual(EState *estate, EPQState *epqstate, - Relation relation, Index rti, int lockmode, - ItemPointer tid, TransactionId priorXmax); -extern StorageTuple EvalPlanQualFetch(EState *estate, Relation relation, - int lockmode, LockWaitPolicy wait_policy, ItemPointer tid, - TransactionId priorXmax); + Relation relation, Index rti, StorageTuple tuple); extern void EvalPlanQualInit(EPQState *epqstate, EState *estate, Plan *subplan, List *auxrowmarks, int epqParam); extern void EvalPlanQualSetPlan(EPQState *epqstate, diff --git a/src/include/nodes/lockoptions.h b/src/include/nodes/lockoptions.h index 24afd6efd4..bcde234614 100644 --- a/src/include/nodes/lockoptions.h +++ b/src/include/nodes/lockoptions.h @@ -43,4 +43,9 @@ typedef enum LockWaitPolicy LockWaitError } LockWaitPolicy; +/* Follow tuples whose update is in progress if lock modes don't conflict */ +#define TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS 0x01 +/* Follow update chain and lock lastest version of tuple */ +#define TUPLE_LOCK_FLAG_FIND_LAST_VERSION 0x02 + #endif /* LOCKOPTIONS_H */ diff --git a/src/include/utils/snapshot.h b/src/include/utils/snapshot.h index ca96fd00fa..95a91db03c 100644 --- a/src/include/utils/snapshot.h +++ b/src/include/utils/snapshot.h @@ -136,6 +136,7 @@ typedef enum HeapTupleInvisible, HeapTupleSelfUpdated, HeapTupleUpdated, + HeapTupleDeleted, HeapTupleBeingUpdated, HeapTupleWouldBlock /* can be returned by heap_tuple_lock */ } HTSU_Result; -- 2.15.0.windows.1