From 12376fa6311c435c33b2cd8464b6fca0390e9ab2 Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Thu, 30 Jun 2022 22:07:12 +0300 Subject: [PATCH] Allow locking updated tuples in tuple_update() and tuple_delete() Currently, in read committed transaction isolation mode (default), we have the following sequence of actions when tuple_update()/tuple_delete() finds the tuple updated by concurrent transaction. 1. Attempt to update/delete tuple with tuple_update()/tuple_delete(), which returns TM_Updated. 2. Lock tuple with tuple_lock(). 3. Re-evaluate plan qual (recheck if we still need to update/delete and calculate the new tuple for update). 4. Second attempt to update/delete tuple with tuple_update()/tuple_delete(). This attempt should be successful, since the tuple was previously locked. This patch eliminates step 2 by taking the lock during first tuple_update()/tuple_delete() call. Heap table access methods could save efforts by traversing chain of updated tuples once instead of twice. Future undo-based table access methods, which will start from the latest row version. can immediately place a lock there. The code in nodeModifyTable.c is simplified by removing the nested switch/case. Discussion: https://postgr.es/m/CAPpHfdua-YFw3XTprfutzGp28xXLigFtzNbuFY8yPhqeq6X5kg%40mail.gmail.com Reviewed-by: Aleksander Alekseev, Pavel Borisov, Vignesh C, Mason Sharp --- src/backend/access/heap/heapam.c | 117 ++++++++---- src/backend/access/heap/heapam_handler.c | 50 +++++- src/backend/access/table/tableam.c | 6 +- src/backend/executor/nodeModifyTable.c | 215 ++++++++--------------- src/include/access/heapam.h | 26 ++- src/include/access/tableam.h | 32 +++- 6 files changed, 252 insertions(+), 194 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 388df94a442..26623d8e25e 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2667,7 +2667,8 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, Snapshot snapshot, + GetSlotCallback lockedSlotCallback, void *lockedSlotCallbackArg) { TM_Result result; TransactionId xid = GetCurrentTransactionId(); @@ -2865,6 +2866,26 @@ l1: result = TM_Updated; } + if (result == TM_Updated && lockedSlotCallback) + { + HeapLockContext context = {buffer, vmbuffer, have_tuple_lock}; + TupleTableSlot *slot; + + slot = lockedSlotCallback(lockedSlotCallbackArg); + + result = heapam_tuple_lock_internal(relation, tid, snapshot, + slot, cid, LockTupleExclusive, + wait ? LockWaitBlock : LockWaitError, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + tmfd, &context); + if (result == TM_Ok) + { + tmfd->traversed = true; + return TM_Updated; + } + return result; + } + if (result != TM_Ok) { Assert(result == TM_SelfModified || @@ -3088,7 +3109,8 @@ simple_heap_delete(Relation relation, ItemPointer tid) result = heap_delete(relation, tid, GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , - &tmfd, false /* changingPart */ ); + &tmfd, false /* changingPart */ , + SnapshotAny, NULL, NULL); switch (result) { case TM_SelfModified: @@ -3128,7 +3150,8 @@ simple_heap_delete(Relation relation, ItemPointer tid) TM_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, LockTupleMode *lockmode) + TM_FailureData *tmfd, LockTupleMode *lockmode, Snapshot snapshot, + GetSlotCallback lockedSlotCallback, void *lockedSlotCallbackArg) { TM_Result result; TransactionId xid = GetCurrentTransactionId(); @@ -3495,6 +3518,31 @@ l2: } } + if (result == TM_Updated && lockedSlotCallback) + { + HeapLockContext context = {buffer, vmbuffer, have_tuple_lock}; + TupleTableSlot *slot; + + slot = lockedSlotCallback(lockedSlotCallbackArg); + + result = heapam_tuple_lock_internal(relation, otid, snapshot, + slot, cid, *lockmode, + wait ? LockWaitBlock : LockWaitError, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + tmfd, &context); + bms_free(hot_attrs); + bms_free(key_attrs); + bms_free(id_attrs); + bms_free(modified_attrs); + bms_free(interesting_attrs); + if (result == TM_Ok) + { + tmfd->traversed = true; + return TM_Updated; + } + return result; + } + if (result != TM_Ok) { Assert(result == TM_SelfModified || @@ -4173,7 +4221,7 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) result = heap_update(relation, otid, tup, GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , - &tmfd, &lockmode); + &tmfd, &lockmode, SnapshotAny, NULL, NULL); switch (result) { case TM_SelfModified: @@ -4255,13 +4303,14 @@ TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, - Buffer *buffer, TM_FailureData *tmfd) + HeapLockContext *context, TM_FailureData *tmfd) { TM_Result result; ItemPointer tid = &(tuple->t_self); ItemId lp; Page page; - Buffer vmbuffer = InvalidBuffer; + Buffer buffer = context->buffer, + vmbuffer = context->vmbuffer; BlockNumber block; TransactionId xid, xmax; @@ -4270,10 +4319,11 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, new_infomask2; bool first_time = true; bool skip_tuple_lock = false; - bool have_tuple_lock = false; + bool have_tuple_lock = context->have_tuple_lock; bool cleared_all_frozen = false; - *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + if (BufferIsInvalid(buffer)) + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); block = ItemPointerGetBlockNumber(tid); /* @@ -4282,12 +4332,13 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, * in the middle of changing this, so we'll need to recheck after we have * the lock. */ - if (PageIsAllVisible(BufferGetPage(*buffer))) + if (BufferIsInvalid(vmbuffer) && PageIsAllVisible(BufferGetPage(buffer))) visibilitymap_pin(relation, block, &vmbuffer); - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + if (BufferIsInvalid(context->buffer)) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - page = BufferGetPage(*buffer); + page = BufferGetPage(buffer); lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid)); Assert(ItemIdIsNormal(lp)); @@ -4296,7 +4347,7 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, tuple->t_tableOid = RelationGetRelid(relation); l3: - result = HeapTupleSatisfiesUpdate(tuple, cid, *buffer); + result = HeapTupleSatisfiesUpdate(tuple, cid, buffer); if (result == TM_Invisible) { @@ -4325,7 +4376,7 @@ l3: infomask2 = tuple->t_data->t_infomask2; ItemPointerCopy(&tuple->t_data->t_ctid, &t_ctid); - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* * If any subtransaction of the current top transaction already holds @@ -4477,12 +4528,12 @@ l3: { result = res; /* recovery code expects to have buffer lock held */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } } - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* * Make sure it's still an appropriate lock, else start over. @@ -4517,7 +4568,7 @@ l3: if (HEAP_XMAX_IS_LOCKED_ONLY(infomask) && !HEAP_XMAX_IS_EXCL_LOCKED(infomask)) { - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* * Make sure it's still an appropriate lock, else start over. @@ -4545,7 +4596,7 @@ l3: * No conflict, but if the xmax changed under us in the * meantime, start over. */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait)) @@ -4557,7 +4608,7 @@ l3: } else if (HEAP_XMAX_IS_KEYSHR_LOCKED(infomask)) { - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* if the xmax changed in the meantime, start over */ if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || @@ -4585,7 +4636,7 @@ l3: TransactionIdIsCurrentTransactionId(xwait)) { /* ... but if the xmax changed in the meantime, start over */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); if (xmax_infomask_changed(tuple->t_data->t_infomask, infomask) || !TransactionIdEquals(HeapTupleHeaderGetRawXmax(tuple->t_data), xwait)) @@ -4607,7 +4658,7 @@ l3: */ if (require_sleep && (result == TM_Updated || result == TM_Deleted)) { - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } else if (require_sleep) @@ -4632,7 +4683,7 @@ l3: */ result = TM_WouldBlock; /* recovery code expects to have buffer lock held */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } @@ -4658,7 +4709,7 @@ l3: { result = TM_WouldBlock; /* recovery code expects to have buffer lock held */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } break; @@ -4698,7 +4749,7 @@ l3: { result = TM_WouldBlock; /* recovery code expects to have buffer lock held */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } break; @@ -4724,12 +4775,12 @@ l3: { result = res; /* recovery code expects to have buffer lock held */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto failed; } } - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); /* * xwait is done, but if xwait had just locked the tuple then some @@ -4751,7 +4802,7 @@ l3: * don't check for this in the multixact case, because some * locker transactions might still be running. */ - UpdateXmaxHintBits(tuple->t_data, *buffer, xwait); + UpdateXmaxHintBits(tuple->t_data, buffer, xwait); } } @@ -4810,9 +4861,9 @@ failed: */ if (vmbuffer == InvalidBuffer && PageIsAllVisible(page)) { - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); visibilitymap_pin(relation, block, &vmbuffer); - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); goto l3; } @@ -4875,7 +4926,7 @@ failed: cleared_all_frozen = true; - MarkBufferDirty(*buffer); + MarkBufferDirty(buffer); /* * XLOG stuff. You might think that we don't need an XLOG record because @@ -4895,7 +4946,7 @@ failed: XLogRecPtr recptr; XLogBeginInsert(); - XLogRegisterBuffer(0, *buffer, REGBUF_STANDARD); + XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); xlrec.offnum = ItemPointerGetOffsetNumber(&tuple->t_self); xlrec.locking_xid = xid; @@ -4916,7 +4967,7 @@ failed: result = TM_Ok; out_locked: - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); out_unlocked: if (BufferIsValid(vmbuffer)) @@ -4934,6 +4985,10 @@ out_unlocked: if (have_tuple_lock) UnlockTupleTuplock(relation, tid, mode); + context->buffer = buffer; + context->vmbuffer = InvalidBuffer; + context->have_tuple_lock = false; + return result; } diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index c4b1916d36e..6f024b5cccd 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -299,14 +299,21 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, static TM_Result heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg) { + TM_Result result; + /* * Currently Deleting of index tuples are handled at vacuum, in case if * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + result = heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart, + snapshot, lockedSlotCallback, lockedSlotCallbackArg); + + return result; } @@ -314,7 +321,9 @@ static TM_Result heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, - LockTupleMode *lockmode, bool *update_indexes) + LockTupleMode *lockmode, bool *update_indexes, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); @@ -325,7 +334,7 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, tuple->t_tableOid = slot->tts_tableOid; result = heap_update(relation, otid, tuple, cid, crosscheck, wait, - tmfd, lockmode); + tmfd, lockmode, snapshot, lockedSlotCallback, lockedSlotCallbackArg); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); /* @@ -349,12 +358,27 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, TupleTableSlot *slot, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, uint8 flags, TM_FailureData *tmfd) +{ + return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid, mode, + wait_policy, flags, tmfd, NULL); +} + +/* + * This routine does the work for heapam_tuple_lock(), but also support + * `updated` to re-use the work done by heapam_tuple_update() or + * heapam_tuple_delete() on fetching tuple and checking its visibility. + */ +TM_Result +heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd, HeapLockContext *existingContext) { BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; TM_Result result; - Buffer buffer; HeapTuple tuple = &bslot->base.tupdata; bool follow_updates; + Buffer buffer = InvalidBuffer; follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0; tmfd->traversed = false; @@ -363,8 +387,20 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, tuple_lock_retry: tuple->t_self = *tid; - result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, - follow_updates, &buffer, tmfd); + if (!existingContext) + { + HeapLockContext context = {InvalidBuffer, InvalidBuffer, false}; + result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, + follow_updates, &context, tmfd); + buffer = context.buffer; + } + else + { + result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, + follow_updates, existingContext, tmfd); + buffer = existingContext->buffer; + existingContext = NULL; + } if (result == TM_Updated && (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index ef0d34fceee..a87b86ff614 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -306,7 +306,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot) GetCurrentCommandId(true), snapshot, InvalidSnapshot, true /* wait for commit */ , - &tmfd, false /* changingPart */ ); + &tmfd, false /* changingPart */ , + NULL, NULL); switch (result) { @@ -355,7 +356,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid, GetCurrentCommandId(true), snapshot, InvalidSnapshot, true /* wait for commit */ , - &tmfd, &lockmode, update_indexes); + &tmfd, &lockmode, update_indexes, + NULL, NULL); switch (result) { diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 687a5422eab..dfefa2cdadb 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1326,6 +1326,23 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, return true; } +typedef struct +{ + EPQState *epqstate; + ResultRelInfo *resultRelInfo; +} GetEPQSlotArg; + + +static TupleTableSlot * +GetEPQSlot(void *arg) +{ + GetEPQSlotArg *slotArg = (GetEPQSlotArg *) arg; + + return EvalPlanQualSlot(slotArg->epqstate, + slotArg->resultRelInfo->ri_RelationDesc, + slotArg->resultRelInfo->ri_RangeTableIndex); +} + /* * ExecDeleteAct -- subroutine for ExecDelete * @@ -1338,6 +1355,7 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, bool changingPart) { EState *estate = context->estate; + GetEPQSlotArg slotArg = {.epqstate = context->epqstate, .resultRelInfo = resultRelInfo}; return table_tuple_delete(resultRelInfo->ri_RelationDesc, tupleid, estate->es_output_cid, @@ -1345,7 +1363,9 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, estate->es_crosscheck_snapshot, true /* wait for commit */ , &context->tmfd, - changingPart); + changingPart, + GetEPQSlot, + &slotArg); } /* @@ -1543,87 +1563,33 @@ ldelete: errmsg("could not serialize access due to concurrent update"))); /* - * Already know that we're going to need to do EPQ, so - * fetch tuple directly into the right slot. + * ExecDeleteAct() has already locked the old tuple for + * us. Now we need to copy it to the right slot. */ EvalPlanQualBegin(context->epqstate); inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); - result = table_tuple_lock(resultRelationDesc, tupleid, - estate->es_snapshot, - inputslot, estate->es_output_cid, - LockTupleExclusive, LockWaitBlock, - TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &context->tmfd); + Assert(context->tmfd.traversed); + epqslot = EvalPlanQual(context->epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + if (TupIsNull(epqslot)) + /* Tuple not passing quals anymore, exiting... */ + return NULL; - switch (result) + /* + * If requested, skip delete and pass back the updated + * row. + */ + if (epqreturnslot) { - case TM_Ok: - Assert(context->tmfd.traversed); - epqslot = EvalPlanQual(context->epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - inputslot); - if (TupIsNull(epqslot)) - /* Tuple not passing quals anymore, exiting... */ - return NULL; - - /* - * If requested, skip delete and pass back the - * updated row. - */ - if (epqreturnslot) - { - *epqreturnslot = epqslot; - return NULL; - } - else - goto ldelete; - - case TM_SelfModified: - - /* - * This can be reached when following an update - * chain from a tuple updated by another session, - * reaching a tuple that was already updated in - * this transaction. If previously updated by this - * command, ignore the delete, otherwise error - * out. - * - * See also TM_SelfModified response to - * table_tuple_delete() above. - */ - if (context->tmfd.cmax != estate->es_output_cid) - ereport(ERROR, - (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), - errmsg("tuple to be deleted was already modified by an operation triggered by the current command"), - errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); - return NULL; - - case TM_Deleted: - /* tuple already deleted; nothing to do */ - return NULL; - - default: - - /* - * TM_Invisible should be impossible because we're - * waiting for updated row versions, and would - * already have errored out if the first version - * is invisible. - * - * TM_Updated should be impossible, because we're - * locking the latest version via - * TUPLE_LOCK_FLAG_FIND_LAST_VERSION. - */ - elog(ERROR, "unexpected table_tuple_lock status: %u", - result); - return NULL; + *epqreturnslot = epqslot; + return NULL; } - - Assert(false); - break; + else + goto ldelete; } case TM_Deleted: @@ -1961,6 +1927,7 @@ ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; bool partition_constraint_failed; TM_Result result; + GetEPQSlotArg slotArg = {.epqstate = context->epqstate, .resultRelInfo = resultRelInfo}; updateCxt->crossPartUpdate = false; @@ -2089,7 +2056,9 @@ lreplace: estate->es_crosscheck_snapshot, true /* wait for commit */ , &context->tmfd, &updateCxt->lockmode, - &updateCxt->updateIndexes); + &updateCxt->updateIndexes, + GetEPQSlot, + &slotArg); if (result == TM_Ok) updateCxt->updated = true; @@ -2240,7 +2209,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context, static TupleTableSlot * ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, - bool canSetTag) + bool canSetTag, bool locked) { EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; @@ -2357,80 +2326,38 @@ redo_act: ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); + Assert(!locked); /* - * Already know that we're going to need to do EPQ, so - * fetch tuple directly into the right slot. + * ExecUpdateAct() has already locked the old tuple for + * us. Now we need to copy it to the right slot. */ inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); - result = table_tuple_lock(resultRelationDesc, tupleid, - estate->es_snapshot, - inputslot, estate->es_output_cid, - updateCxt.lockmode, LockWaitBlock, - TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &context->tmfd); - - switch (result) - { - case TM_Ok: - Assert(context->tmfd.traversed); - - epqslot = EvalPlanQual(context->epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - inputslot); - if (TupIsNull(epqslot)) - /* Tuple not passing quals anymore, exiting... */ - return NULL; - - /* Make sure ri_oldTupleSlot is initialized. */ - if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) - ExecInitUpdateProjection(context->mtstate, - resultRelInfo); - - /* Fetch the most recent version of old tuple. */ - oldSlot = resultRelInfo->ri_oldTupleSlot; - if (!table_tuple_fetch_row_version(resultRelationDesc, - tupleid, - SnapshotAny, - oldSlot)) - elog(ERROR, "failed to fetch tuple being updated"); - slot = ExecGetUpdateNewTuple(resultRelInfo, - epqslot, oldSlot); - goto redo_act; - - case TM_Deleted: - /* tuple already deleted; nothing to do */ - return NULL; - - case TM_SelfModified: + epqslot = EvalPlanQual(context->epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + if (TupIsNull(epqslot)) + /* Tuple not passing quals anymore, exiting... */ + return NULL; - /* - * This can be reached when following an update - * chain from a tuple updated by another session, - * reaching a tuple that was already updated in - * this transaction. If previously modified by - * this command, ignore the redundant update, - * otherwise error out. - * - * See also TM_SelfModified response to - * table_tuple_update() above. - */ - if (context->tmfd.cmax != estate->es_output_cid) - ereport(ERROR, - (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), - errmsg("tuple to be updated was already modified by an operation triggered by the current command"), - errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); - return NULL; + /* Make sure ri_oldTupleSlot is initialized. */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitUpdateProjection(context->mtstate, + resultRelInfo); - default: - /* see table_tuple_lock call in ExecDelete() */ - elog(ERROR, "unexpected table_tuple_lock status: %u", - result); - return NULL; - } + /* Fetch the most recent version of old tuple. */ + oldSlot = resultRelInfo->ri_oldTupleSlot; + if (!table_tuple_fetch_row_version(resultRelationDesc, + tupleid, + SnapshotAny, + oldSlot)) + elog(ERROR, "failed to fetch tuple being updated"); + slot = ExecGetUpdateNewTuple(resultRelInfo, + epqslot, oldSlot); + goto redo_act; } break; @@ -2674,7 +2601,7 @@ ExecOnConflictUpdate(ModifyTableContext *context, *returning = ExecUpdate(context, resultRelInfo, conflictTid, NULL, resultRelInfo->ri_onConflict->oc_ProjSlot, - canSetTag); + canSetTag, true); /* * Clear out existing tuple, as there might not be another conflict among @@ -3847,7 +3774,7 @@ ExecModifyTable(PlanState *pstate) /* Now apply the update. */ slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, - slot, node->canSetTag); + slot, node->canSetTag, false); break; case CMD_DELETE: diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 417108f1e01..a5e8a90c508 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -191,6 +191,14 @@ typedef struct HeapPageFreeze } HeapPageFreeze; +typedef struct +{ + Buffer buffer; + Buffer vmbuffer; + bool have_tuple_lock; +} HeapLockContext; + + /* ---------------- * function prototypes for heap access method * @@ -243,17 +251,24 @@ extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, - struct TM_FailureData *tmfd, bool changingPart); + struct TM_FailureData *tmfd, bool changingPart, + Snapshot snapshot, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg); extern void heap_finish_speculative(Relation relation, ItemPointer tid); extern void heap_abort_speculative(Relation relation, ItemPointer tid); extern TM_Result heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, CommandId cid, Snapshot crosscheck, bool wait, - struct TM_FailureData *tmfd, LockTupleMode *lockmode); + struct TM_FailureData *tmfd, LockTupleMode *lockmode, + Snapshot snapshot, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg); extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple, CommandId cid, LockTupleMode mode, LockWaitPolicy wait_policy, bool follow_updates, - Buffer *buffer, struct TM_FailureData *tmfd); + HeapLockContext *context, + struct TM_FailureData *tmfd); extern void heap_inplace_update(Relation relation, HeapTuple tuple); extern bool heap_prepare_freeze_tuple(HeapTupleHeader tuple, @@ -328,4 +343,9 @@ extern bool ResolveCminCmaxDuringDecoding(struct HTAB *tuplecid_data, extern void HeapCheckForSerializableConflictOut(bool visible, Relation relation, HeapTuple tuple, Buffer buffer, Snapshot snapshot); +extern TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd, HeapLockContext *existingContext); + #endif /* HEAPAM_H */ diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 3fb184717f6..71b93f1674b 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -252,6 +252,9 @@ typedef void (*IndexBuildCallback) (Relation index, bool tupleIsAlive, void *state); +/* Typedef for callback function for table_index_build_scan */ +typedef TupleTableSlot *(*GetSlotCallback) (void *arg); + /* * API struct for a table AM. Note this must be allocated in a * server-lifetime manner, typically as a static const struct, which then gets @@ -514,7 +517,9 @@ typedef struct TableAmRoutine Snapshot crosscheck, bool wait, TM_FailureData *tmfd, - bool changingPart); + bool changingPart, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg); /* see table_tuple_update() for reference about parameters */ TM_Result (*tuple_update) (Relation rel, @@ -526,7 +531,9 @@ typedef struct TableAmRoutine bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - bool *update_indexes); + bool *update_indexes, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg); /* see table_tuple_lock() for reference about parameters */ TM_Result (*tuple_lock) (Relation rel, @@ -1449,6 +1456,8 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, * tmfd - filled in failure cases (see below) * changingPart - true iff the tuple is being moved to another partition * table due to an update of the partition key. Otherwise, false. + * lockedSlot - slot to save the locked tuple if should lock the last row + * version during the concurrent update. NULL if not needed. * * Normal, successful return value is TM_Ok, which means we did actually * delete it. Failure return codes are TM_SelfModified, TM_Updated, and @@ -1461,11 +1470,15 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, static inline TM_Result table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, + GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg) { return rel->rd_tableam->tuple_delete(rel, tid, cid, snapshot, crosscheck, - wait, tmfd, changingPart); + wait, tmfd, changingPart, + lockedSlotCallback, + lockedSlotCallbackArg); } /* @@ -1487,7 +1500,9 @@ table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, * lockmode - filled with lock mode acquired on tuple * update_indexes - in success cases this is set to true if new index entries * are required for this tuple - * + * lockedSlot - slot to save the locked tuple if should lock the last row + * version during the concurrent update. NULL if not needed. + * Normal, successful return value is TM_Ok, which means we did actually * update it. Failure return codes are TM_SelfModified, TM_Updated, and * TM_BeingModified (the last only possible if wait == false). @@ -1506,12 +1521,15 @@ static inline TM_Result table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - bool *update_indexes) + bool *update_indexes, GetSlotCallback lockedSlotCallback, + void *lockedSlotCallbackArg) { return rel->rd_tableam->tuple_update(rel, otid, slot, cid, snapshot, crosscheck, wait, tmfd, - lockmode, update_indexes); + lockmode, update_indexes, + lockedSlotCallback, + lockedSlotCallbackArg); } /* -- 2.37.1 (Apple Git-137.1)