From 7bb1e45353f5b81a9963cda843b1aee0f65b05ad Mon Sep 17 00:00:00 2001 From: Alexander Korotkov Date: Thu, 30 Jun 2022 22:07:12 +0300 Subject: [PATCH] Lock updated tuples in tuple_update() and tuple_delete() --- src/backend/access/heap/heapam_handler.c | 99 ++++++++-- src/backend/access/table/tableam.c | 6 +- src/backend/executor/nodeModifyTable.c | 227 ++++++++--------------- src/include/access/tableam.h | 20 +- 4 files changed, 181 insertions(+), 171 deletions(-) diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 444f027149c..ffe78bb6b25 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -45,6 +45,12 @@ #include "utils/builtins.h" #include "utils/rel.h" +static TM_Result heapam_tuple_lock_internal(Relation relation, ItemPointer tid, + Snapshot snapshot, TupleTableSlot *slot, + CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd, bool updated); + static void reform_and_rewrite_tuple(HeapTuple tuple, Relation OldHeap, Relation NewHeap, Datum *values, bool *isnull, RewriteState rwstate); @@ -299,14 +305,38 @@ heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, static TM_Result heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, + bool lockUpdated, TupleTableSlot *lockedSlot) { + TM_Result result; + /* * Currently Deleting of index tuples are handled at vacuum, in case if * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + result = heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart); + + if (result == TM_Updated && lockUpdated) + { + bool updated = false; + + if (!ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + updated = true; + result = heapam_tuple_lock_internal(relation, tid, snapshot, + lockedSlot, cid, LockTupleExclusive, + LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + tmfd, updated); + + if (result == TM_Ok) + { + tmfd->traversed = true; + return TM_Updated; + } + } + + return result; } @@ -314,7 +344,8 @@ static TM_Result heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, - LockTupleMode *lockmode, bool *update_indexes) + LockTupleMode *lockmode, bool *update_indexes, + bool lockUpdated, TupleTableSlot *lockedSlot) { bool shouldFree = true; HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); @@ -341,14 +372,34 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, if (shouldFree) pfree(tuple); + if (result == TM_Updated && lockUpdated) + { + bool updated = false; + + if (!ItemPointerIndicatesMovedPartitions(&tmfd->ctid)) + updated = true; + + result = heapam_tuple_lock_internal(relation, otid, snapshot, + lockedSlot, cid, *lockmode, + LockWaitBlock, + TUPLE_LOCK_FLAG_FIND_LAST_VERSION, + tmfd, updated); + + if (result == TM_Ok) + { + tmfd->traversed = true; + return TM_Updated; + } + } + return result; } static TM_Result -heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, - TupleTableSlot *slot, CommandId cid, LockTupleMode mode, - LockWaitPolicy wait_policy, uint8 flags, - TM_FailureData *tmfd) +heapam_tuple_lock_internal(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd, bool updated) { BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; TM_Result result; @@ -363,16 +414,30 @@ heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, tuple_lock_retry: tuple->t_self = *tid; - result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, - follow_updates, &buffer, tmfd); + if (!updated) + { + result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy, + follow_updates, &buffer, tmfd); + } + else + { + result = TM_Updated; + } if (result == TM_Updated && (flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION)) { - /* Should not encounter speculative tuple on recheck */ - Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); + if (!updated) + { + /* Should not encounter speculative tuple on recheck */ + Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data)); - ReleaseBuffer(buffer); + ReleaseBuffer(buffer); + } + else + { + updated = false; + } if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self)) { @@ -559,6 +624,16 @@ tuple_lock_retry: return result; } +static TM_Result +heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd) +{ + return heapam_tuple_lock_internal(relation, tid, snapshot, slot, cid, mode, + wait_policy, flags, tmfd, false); +} + /* ------------------------------------------------------------------------ * DDL related callbacks for heap AM. diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index b3d1a6c3f8f..94bec180814 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -307,7 +307,8 @@ simple_table_tuple_delete(Relation rel, ItemPointer tid, Snapshot snapshot) GetCurrentCommandId(true), snapshot, InvalidSnapshot, true /* wait for commit */ , - &tmfd, false /* changingPart */ ); + &tmfd, false /* changingPart */ , + false, NULL); switch (result) { @@ -356,7 +357,8 @@ simple_table_tuple_update(Relation rel, ItemPointer otid, GetCurrentCommandId(true), snapshot, InvalidSnapshot, true /* wait for commit */ , - &tmfd, &lockmode, update_indexes); + &tmfd, &lockmode, update_indexes, + false, NULL); switch (result) { diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index a49c3da5b6c..41a955d7072 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -1218,7 +1218,8 @@ ExecDeletePrologue(ModifyTableContext *context, ResultRelInfo *resultRelInfo, */ static TM_Result ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, - ItemPointer tupleid, bool changingPart) + ItemPointer tupleid, bool changingPart, + bool lockUpdated, TupleTableSlot *lockedSlot) { EState *estate = context->estate; @@ -1228,7 +1229,9 @@ ExecDeleteAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, estate->es_crosscheck_snapshot, true /* wait for commit */ , &context->tmfd, - changingPart); + changingPart, + lockUpdated, + lockedSlot); } /* @@ -1373,7 +1376,12 @@ ExecDelete(ModifyTableContext *context, * transaction-snapshot mode transactions. */ ldelete:; - result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart); + + if (!IsolationUsesXactSnapshot()) + slot = ExecGetReturningSlot(estate, resultRelInfo); + + result = ExecDeleteAct(context, resultRelInfo, tupleid, changingPart, + !IsolationUsesXactSnapshot(), slot); switch (result) { @@ -1432,81 +1440,28 @@ ldelete:; EvalPlanQualBegin(context->epqstate); inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); + ExecCopySlot(inputslot, slot); - result = table_tuple_lock(resultRelationDesc, tupleid, - estate->es_snapshot, - inputslot, estate->es_output_cid, - LockTupleExclusive, LockWaitBlock, - TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &context->tmfd); + Assert(context->tmfd.traversed); + epqslot = EvalPlanQual(context->epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + if (TupIsNull(epqslot)) + /* Tuple not passing quals anymore, exiting... */ + return NULL; - switch (result) + /* + * If requested, skip delete and pass back the updated + * row. + */ + if (epqreturnslot) { - case TM_Ok: - Assert(context->tmfd.traversed); - epqslot = EvalPlanQual(context->epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - inputslot); - if (TupIsNull(epqslot)) - /* Tuple not passing quals anymore, exiting... */ - return NULL; - - /* - * If requested, skip delete and pass back the - * updated row. - */ - if (epqreturnslot) - { - *epqreturnslot = epqslot; - return NULL; - } - else - goto ldelete; - - case TM_SelfModified: - - /* - * This can be reached when following an update - * chain from a tuple updated by another session, - * reaching a tuple that was already updated in - * this transaction. If previously updated by this - * command, ignore the delete, otherwise error - * out. - * - * See also TM_SelfModified response to - * table_tuple_delete() above. - */ - if (context->tmfd.cmax != estate->es_output_cid) - ereport(ERROR, - (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), - errmsg("tuple to be deleted was already modified by an operation triggered by the current command"), - errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); - return NULL; - - case TM_Deleted: - /* tuple already deleted; nothing to do */ - return NULL; - - default: - - /* - * TM_Invisible should be impossible because we're - * waiting for updated row versions, and would - * already have errored out if the first version - * is invisible. - * - * TM_Updated should be impossible, because we're - * locking the latest version via - * TUPLE_LOCK_FLAG_FIND_LAST_VERSION. - */ - elog(ERROR, "unexpected table_tuple_lock status: %u", - result); - return NULL; + *epqreturnslot = epqslot; + return NULL; } - - Assert(false); - break; + else + goto ldelete; } case TM_Deleted: @@ -1569,7 +1524,7 @@ ldelete:; { ExecForceStoreHeapTuple(oldtuple, slot, false); } - else + else if (TupIsNull(slot)) { if (!table_tuple_fetch_row_version(resultRelationDesc, tupleid, SnapshotAny, slot)) @@ -1838,7 +1793,8 @@ ExecUpdatePrepareSlot(ResultRelInfo *resultRelInfo, static TM_Result ExecUpdateAct(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, - bool canSetTag, UpdateContext *updateCxt) + bool canSetTag, UpdateContext *updateCxt, bool lockUpdated, + TupleTableSlot *lockedSlot) { EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; @@ -1972,7 +1928,8 @@ lreplace:; estate->es_crosscheck_snapshot, true /* wait for commit */ , &context->tmfd, &updateCxt->lockmode, - &updateCxt->updateIndexes); + &updateCxt->updateIndexes, + lockUpdated, lockedSlot); if (result == TM_Ok) updateCxt->updated = true; @@ -2123,7 +2080,7 @@ ExecCrossPartitionUpdateForeignKey(ModifyTableContext *context, static TupleTableSlot * ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, ItemPointer tupleid, HeapTuple oldtuple, TupleTableSlot *slot, - bool canSetTag) + bool canSetTag, bool locked) { EState *estate = context->estate; Relation resultRelationDesc = resultRelInfo->ri_RelationDesc; @@ -2176,12 +2133,32 @@ ExecUpdate(ModifyTableContext *context, ResultRelInfo *resultRelInfo, } else { + TupleTableSlot *oldSlot; + bool lockUpdated; + /* Fill in the slot appropriately */ ExecUpdatePrepareSlot(resultRelInfo, slot, estate); redo_act: + if (!IsolationUsesXactSnapshot() && !locked) + { + /* Make sure ri_oldTupleSlot is initialized. */ + if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) + ExecInitUpdateProjection(context->mtstate, + resultRelInfo); + + /* Fetch the most recent version of old tuple. */ + lockUpdated = true; + oldSlot = resultRelInfo->ri_oldTupleSlot; + } + else + { + lockUpdated = false; + oldSlot = NULL; + } + result = ExecUpdateAct(context, resultRelInfo, tupleid, oldtuple, slot, - canSetTag, &updateCxt); + canSetTag, &updateCxt, lockUpdated, oldSlot); /* * If ExecUpdateAct reports that a cross-partition update was done, @@ -2234,12 +2211,12 @@ redo_act: { TupleTableSlot *inputslot; TupleTableSlot *epqslot; - TupleTableSlot *oldSlot; if (IsolationUsesXactSnapshot()) ereport(ERROR, (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), errmsg("could not serialize access due to concurrent update"))); + Assert(!locked); /* * Already know that we're going to need to do EPQ, so @@ -2247,73 +2224,19 @@ redo_act: */ inputslot = EvalPlanQualSlot(context->epqstate, resultRelationDesc, resultRelInfo->ri_RangeTableIndex); - - result = table_tuple_lock(resultRelationDesc, tupleid, - estate->es_snapshot, - inputslot, estate->es_output_cid, - updateCxt.lockmode, LockWaitBlock, - TUPLE_LOCK_FLAG_FIND_LAST_VERSION, - &context->tmfd); - - switch (result) - { - case TM_Ok: - Assert(context->tmfd.traversed); - - epqslot = EvalPlanQual(context->epqstate, - resultRelationDesc, - resultRelInfo->ri_RangeTableIndex, - inputslot); - if (TupIsNull(epqslot)) - /* Tuple not passing quals anymore, exiting... */ - return NULL; - - /* Make sure ri_oldTupleSlot is initialized. */ - if (unlikely(!resultRelInfo->ri_projectNewInfoValid)) - ExecInitUpdateProjection(context->mtstate, - resultRelInfo); - - /* Fetch the most recent version of old tuple. */ - oldSlot = resultRelInfo->ri_oldTupleSlot; - if (!table_tuple_fetch_row_version(resultRelationDesc, - tupleid, - SnapshotAny, - oldSlot)) - elog(ERROR, "failed to fetch tuple being updated"); - slot = ExecGetUpdateNewTuple(resultRelInfo, - epqslot, oldSlot); - goto redo_act; - - case TM_Deleted: - /* tuple already deleted; nothing to do */ - return NULL; - - case TM_SelfModified: - - /* - * This can be reached when following an update - * chain from a tuple updated by another session, - * reaching a tuple that was already updated in - * this transaction. If previously modified by - * this command, ignore the redundant update, - * otherwise error out. - * - * See also TM_SelfModified response to - * table_tuple_update() above. - */ - if (context->tmfd.cmax != estate->es_output_cid) - ereport(ERROR, - (errcode(ERRCODE_TRIGGERED_DATA_CHANGE_VIOLATION), - errmsg("tuple to be updated was already modified by an operation triggered by the current command"), - errhint("Consider using an AFTER trigger instead of a BEFORE trigger to propagate changes to other rows."))); - return NULL; - - default: - /* see table_tuple_lock call in ExecDelete() */ - elog(ERROR, "unexpected table_tuple_lock status: %u", - result); - return NULL; - } + ExecCopySlot(inputslot, oldSlot); + Assert(context->tmfd.traversed); + + epqslot = EvalPlanQual(context->epqstate, + resultRelationDesc, + resultRelInfo->ri_RangeTableIndex, + inputslot); + if (TupIsNull(epqslot)) + /* Tuple not passing quals anymore, exiting... */ + return NULL; + slot = ExecGetUpdateNewTuple(resultRelInfo, + epqslot, oldSlot); + goto redo_act; } break; @@ -2557,7 +2480,7 @@ ExecOnConflictUpdate(ModifyTableContext *context, *returning = ExecUpdate(context, resultRelInfo, conflictTid, NULL, resultRelInfo->ri_onConflict->oc_ProjSlot, - canSetTag); + canSetTag, true); /* * Clear out existing tuple, as there might not be another conflict among @@ -2764,7 +2687,8 @@ lmerge_matched:; } ExecUpdatePrepareSlot(resultRelInfo, newslot, context->estate); result = ExecUpdateAct(context, resultRelInfo, tupleid, NULL, - newslot, mtstate->canSetTag, &updateCxt); + newslot, mtstate->canSetTag, &updateCxt, + false, NULL); if (result == TM_Ok && updateCxt.updated) { ExecUpdateEpilogue(context, &updateCxt, resultRelInfo, @@ -2782,7 +2706,8 @@ lmerge_matched:; result = TM_Ok; break; } - result = ExecDeleteAct(context, resultRelInfo, tupleid, false); + result = ExecDeleteAct(context, resultRelInfo, tupleid, false, + false, NULL); if (result == TM_Ok) { ExecDeleteEpilogue(context, resultRelInfo, tupleid, NULL, @@ -3733,7 +3658,7 @@ ExecModifyTable(PlanState *pstate) /* Now apply the update. */ slot = ExecUpdate(&context, resultRelInfo, tupleid, oldtuple, - slot, node->canSetTag); + slot, node->canSetTag, false); break; case CMD_DELETE: diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index fe869c6c184..2d990163487 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -514,7 +514,9 @@ typedef struct TableAmRoutine Snapshot crosscheck, bool wait, TM_FailureData *tmfd, - bool changingPart); + bool changingPart, + bool lockUpdated, + TupleTableSlot *lockedSlot); /* see table_tuple_update() for reference about parameters */ TM_Result (*tuple_update) (Relation rel, @@ -526,7 +528,9 @@ typedef struct TableAmRoutine bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - bool *update_indexes); + bool *update_indexes, + bool lockUpdated, + TupleTableSlot *lockedSlot); /* see table_tuple_lock() for reference about parameters */ TM_Result (*tuple_lock) (Relation rel, @@ -1461,11 +1465,13 @@ table_multi_insert(Relation rel, TupleTableSlot **slots, int nslots, static inline TM_Result table_tuple_delete(Relation rel, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) + TM_FailureData *tmfd, bool changingPart, + bool lockUpdated, TupleTableSlot *lockedSlot) { return rel->rd_tableam->tuple_delete(rel, tid, cid, snapshot, crosscheck, - wait, tmfd, changingPart); + wait, tmfd, changingPart, + lockUpdated, lockedSlot); } /* @@ -1506,12 +1512,14 @@ static inline TM_Result table_tuple_update(Relation rel, ItemPointer otid, TupleTableSlot *slot, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, - bool *update_indexes) + bool *update_indexes, bool lockUpdated, + TupleTableSlot *lockedSlot) { return rel->rd_tableam->tuple_update(rel, otid, slot, cid, snapshot, crosscheck, wait, tmfd, - lockmode, update_indexes); + lockmode, update_indexes, + lockUpdated, lockedSlot); } /* -- 2.24.3 (Apple Git-128)