Re: invalid tid errors in latest 7.3.4 stable. - Mailing list pgsql-hackers
From | Tom Lane |
---|---|
Subject | Re: invalid tid errors in latest 7.3.4 stable. |
Date | |
Msg-id | 9129.1065044188@sss.pgh.pa.us Whole thread Raw |
In response to | invalid tid errors in latest 7.3.4 stable. (Wade Klaver <archeron@wavefire.com>) |
Responses |
Re: invalid tid errors in latest 7.3.4 stable.
|
List | pgsql-hackers |
Stephan Szabo <sszabo@megazone.bigpanda.com> writes: > On Tue, 30 Sep 2003, Tom Lane wrote: >> I think I can implement it and it will act as stated in my proposal. >> Whether people like the proposed behavior is the big question in my >> mind. > I think it's more reasonable than the current behavior or any of > the others we've hit along the way, and we have to pretty much choose > now if we want to change it for 7.4. I've committed the attached patch. One thing I wanted to double-check with you is that the SELECT FOR UPDATES done in the noaction cases are being correctly handled. I think it is correct to do them with the current snapshot rather than the start-of-transaction snap; do you agree? Also, I did not propagate the crosscheck support into heap_mark4update, meaning that these SELECT FOR UPDATEs won't complain if they find a row that was inserted later than the start of the serializable transaction. I'm not totally sure if they should or not; what do you think? regards, tom lane *** src/backend/access/heap/heapam.c.orig Thu Sep 25 10:22:54 2003 --- src/backend/access/heap/heapam.c Wed Oct 1 16:02:27 2003 *************** *** 1207,1220 **** * NB: do not call this directly unless you are prepared to deal with * concurrent-update conditions. Use simple_heap_delete instead. * * Normal, successful return value is HeapTupleMayBeUpdated, which * actuallymeans we did delete it. Failure return codes are * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated ! * (the last only possible if wait == false). */ int heap_delete(Relation relation, ItemPointer tid, ! ItemPointer ctid, CommandId cid, bool wait) { ItemId lp; HeapTupleData tp; --- 1207,1229 ---- * NB: do not call this directly unless you are prepared to deal with * concurrent-update conditions. Use simple_heap_delete instead. * + * relation - table to be modified + * tid - TID of tuple to be deleted + * ctid - output parameter, used only for failure case (see below) + * cid - delete command ID to use in verifying tuple visibility + * crosscheck - if not SnapshotAny, also check tuple against this + * wait - true if should wait for any conflicting update to commit/abort + * * Normal, successful return value is HeapTupleMayBeUpdated, which * actually means we did delete it. Failure returncodes are * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated ! * (the last only possible if wait == false). On a failure return, ! * *ctid is set to the ctid link of the target tuple (possibly a later ! * version of the row). */ int heap_delete(Relation relation, ItemPointer tid, ! ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { ItemId lp; HeapTupleDatatp; *************** *** 1240,1246 **** tp.t_tableOid = relation->rd_id; l1: ! result = HeapTupleSatisfiesUpdate(&tp, cid); if (result == HeapTupleInvisible) { --- 1249,1255 ---- tp.t_tableOid = relation->rd_id; l1: ! result = HeapTupleSatisfiesUpdate(tp.t_data, cid); if (result == HeapTupleInvisible) { *************** *** 1278,1283 **** --- 1287,1300 ---- else result = HeapTupleUpdated; } + + if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated) + { + /* Perform additional check for serializable RI updates */ + if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck)) + result = HeapTupleUpdated; + } + if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || *************** *** 1378,1384 **** result = heap_delete(relation, tid, &ctid, ! GetCurrentCommandId(), true /* wait for commit */); switch (result) { --- 1395,1401 ---- result = heap_delete(relation, tid, &ctid, ! GetCurrentCommandId(), SnapshotAny, true /* wait for commit */); switch (result) { *************** *** 1407,1420 **** * NB: do not call this directly unless you are prepared to deal with * concurrent-update conditions. Use simple_heap_update instead. * * Normal, successful return value is HeapTupleMayBeUpdated, which * actuallymeans we *did* update it. Failure return codes are * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated ! * (the last only possible if wait == false). */ int heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, ! ItemPointer ctid, CommandId cid, bool wait) { ItemId lp; HeapTupleData oldtup; --- 1424,1449 ---- * NB: do not call this directly unless you are prepared to deal with * concurrent-update conditions. Use simple_heap_update instead. * + * relation - table to be modified + * otid - TID of old tuple to be replaced + * newtup - newly constructed tuple data to store + * ctid - output parameter, used only for failure case (see below) + * cid - update command ID to use in verifying old tuple visibility + * crosscheck - if not SnapshotAny, also check old tuple against this + * wait - true if should wait for any conflicting update to commit/abort + * * Normal, successful return value is HeapTupleMayBeUpdated, which * actually means we *did* update it. Failure returncodes are * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated ! * (the last only possible if wait == false). On a failure return, ! * *ctid is set to the ctid link of the old tuple (possibly a later ! * version of the row). ! * On success, newtup->t_self is set to the TID where the new tuple ! * was inserted. */ int heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, ! ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { ItemId lp; HeapTupleDataoldtup; *************** *** 1450,1456 **** */ l2: ! result = HeapTupleSatisfiesUpdate(&oldtup, cid); if (result == HeapTupleInvisible) { --- 1479,1485 ---- */ l2: ! result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid); if (result == HeapTupleInvisible) { *************** *** 1488,1493 **** --- 1517,1530 ---- else result = HeapTupleUpdated; } + + if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated) + { + /* Perform additional check for serializable RI updates */ + if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck)) + result = HeapTupleUpdated; + } + if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || *************** *** 1718,1724 **** result = heap_update(relation, otid, tup, &ctid, ! GetCurrentCommandId(), true /* wait for commit */); switch (result) { --- 1755,1761 ---- result = heap_update(relation, otid, tup, &ctid, ! GetCurrentCommandId(), SnapshotAny, true /* wait for commit */); switch (result) { *************** *** 1767,1773 **** tuple->t_len = ItemIdGetLength(lp); l3: ! result = HeapTupleSatisfiesUpdate(tuple, cid); if (result == HeapTupleInvisible) { --- 1804,1810 ---- tuple->t_len = ItemIdGetLength(lp); l3: ! result = HeapTupleSatisfiesUpdate(tuple->t_data, cid); if (result == HeapTupleInvisible) { *** src/backend/commands/async.c.orig Mon Sep 15 19:33:39 2003 --- src/backend/commands/async.c Wed Oct 1 15:36:02 2003 *************** *** 537,543 **** */ result = heap_update(lRel, &lTuple->t_self, rTuple, &ctid, ! GetCurrentCommandId(), false /* no wait for commit*/); switch (result) { --- 537,543 ---- */ result = heap_update(lRel, &lTuple->t_self, rTuple, &ctid, ! GetCurrentCommandId(), SnapshotAny, false /*no wait for commit */); switch (result) { *** src/backend/executor/execMain.c.orig Thu Sep 25 14:58:35 2003 --- src/backend/executor/execMain.c Wed Oct 1 17:22:30 2003 *************** *** 104,111 **** * field of the QueryDesc is filled in to describe the tuples that will be * returned, and the internalfields (estate and planstate) are set up. * ! * If useSnapshotNow is true, run the query with SnapshotNow time qual rules ! * instead of the normal use of QuerySnapshot. * * If explainOnly is true, we are not actually intending to run the plan, * only to set up for EXPLAIN; so skip unwanted side-effects. --- 104,117 ---- * field of the QueryDesc is filled in to describe the tuples that will be * returned, and the internalfields (estate and planstate) are set up. * ! * If useCurrentSnapshot is true, run the query with the latest available ! * snapshot, instead of the normal QuerySnapshot. Also, if it's an update ! * or delete query, check that the rows to be updated or deleted would be ! * visible to the normal QuerySnapshot. (This is a special-case behavior ! * needed for referential integrity updates in serializable transactions. ! * We must check all currently-committed rows, but we want to throw a ! * can't-serialize error if any rows that would need updates would not be ! * visible under the normal serializable snapshot.) * * If explainOnly is true, we are not actually intending to runthe plan, * only to set up for EXPLAIN; so skip unwanted side-effects. *************** *** 115,121 **** * ---------------------------------------------------------------- */ void ! ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly) { EState *estate; MemoryContextoldcontext; --- 121,127 ---- * ---------------------------------------------------------------- */ void ! ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot, bool explainOnly) { EState *estate; MemoryContextoldcontext; *************** *** 157,171 **** * the life of this query, even if it outlives the current command and * current snapshot. */ ! if (useSnapshotNow) { ! estate->es_snapshot = SnapshotNow; ! estate->es_snapshot_cid = GetCurrentCommandId(); } else { estate->es_snapshot = CopyQuerySnapshot(); ! estate->es_snapshot_cid = estate->es_snapshot->curcid; } /* --- 163,180 ---- * the life of this query, even if it outlives the current command and * current snapshot. */ ! if (useCurrentSnapshot) { ! /* RI update/delete query --- must use an up-to-date snapshot */ ! estate->es_snapshot = CopyCurrentSnapshot(); ! /* crosscheck updates/deletes against transaction snapshot */ ! estate->es_crosscheck_snapshot = CopyQuerySnapshot(); } else { + /* normal query --- use query snapshot, no crosscheck */ estate->es_snapshot = CopyQuerySnapshot(); ! estate->es_crosscheck_snapshot = SnapshotAny; } /* *************** *** 1118,1124 **** tuple.t_self = *((ItemPointer) DatumGetPointer(datum)); test= heap_mark4update(erm->relation, &tuple, &buffer, ! estate->es_snapshot_cid); ReleaseBuffer(buffer); switch (test) { --- 1127,1133 ---- tuple.t_self = *((ItemPointer) DatumGetPointer(datum)); test= heap_mark4update(erm->relation, &tuple, &buffer, ! estate->es_snapshot->curcid); ReleaseBuffer(buffer); switch (test) { *************** *** 1278,1284 **** if (estate->es_into_relation_descriptor != NULL) { heap_insert(estate->es_into_relation_descriptor,tuple, ! estate->es_snapshot_cid); IncrAppended(); } --- 1287,1293 ---- if (estate->es_into_relation_descriptor != NULL) { heap_insert(estate->es_into_relation_descriptor,tuple, ! estate->es_snapshot->curcid); IncrAppended(); } *************** *** 1354,1360 **** * insert the tuple */ newId = heap_insert(resultRelationDesc, tuple, ! estate->es_snapshot_cid); IncrAppended(); (estate->es_processed)++; --- 1363,1369 ---- * insert the tuple */ newId = heap_insert(resultRelationDesc, tuple, ! estate->es_snapshot->curcid); IncrAppended(); (estate->es_processed)++; *************** *** 1406,1412 **** bool dodelete; dodelete = ExecBRDeleteTriggers(estate, resultRelInfo, tupleid, ! estate->es_snapshot_cid); if (!dodelete) /* "do nothing" */ return; --- 1415,1421 ---- bool dodelete; dodelete = ExecBRDeleteTriggers(estate, resultRelInfo, tupleid, ! estate->es_snapshot->curcid); if (!dodelete) /* "do nothing"*/ return; *************** *** 1418,1424 **** ldelete:; result = heap_delete(resultRelationDesc, tupleid, &ctid, ! estate->es_snapshot_cid, true /* wait for commit */); switch (result) { --- 1427,1434 ---- ldelete:; result = heap_delete(resultRelationDesc, tupleid, &ctid, ! estate->es_snapshot->curcid, ! estate->es_crosscheck_snapshot, true /* wait for commit */); switch(result) { *************** *** 1517,1523 **** newtuple = ExecBRUpdateTriggers(estate, resultRelInfo, tupleid, tuple, ! estate->es_snapshot_cid); if (newtuple == NULL) /* "do nothing" */ return; --- 1527,1533 ---- newtuple = ExecBRUpdateTriggers(estate, resultRelInfo, tupleid, tuple, ! estate->es_snapshot->curcid); if (newtuple == NULL) /* "do nothing"*/ return; *************** *** 1553,1559 **** */ result = heap_update(resultRelationDesc, tupleid, tuple, &ctid, ! estate->es_snapshot_cid, true /* wait for commit */); switch (result) { --- 1563,1570 ---- */ result = heap_update(resultRelationDesc, tupleid, tuple, &ctid, ! estate->es_snapshot->curcid, ! estate->es_crosscheck_snapshot, true /* wait for commit */); switch(result) { *************** *** 2039,2045 **** */ epqstate->es_direction = ForwardScanDirection; epqstate->es_snapshot = estate->es_snapshot; ! epqstate->es_snapshot_cid = estate->es_snapshot_cid; epqstate->es_range_table = estate->es_range_table; epqstate->es_result_relations= estate->es_result_relations; epqstate->es_num_result_relations = estate->es_num_result_relations; --- 2050,2056 ---- */ epqstate->es_direction = ForwardScanDirection; epqstate->es_snapshot = estate->es_snapshot; ! epqstate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; epqstate->es_range_table = estate->es_range_table; epqstate->es_result_relations = estate->es_result_relations; epqstate->es_num_result_relations= estate->es_num_result_relations; *** src/backend/executor/execUtils.c.orig Thu Sep 25 14:58:35 2003 --- src/backend/executor/execUtils.c Wed Oct 1 15:35:55 2003 *************** *** 178,184 **** */ estate->es_direction = ForwardScanDirection; estate->es_snapshot = SnapshotNow; ! estate->es_snapshot_cid = FirstCommandId; estate->es_range_table = NIL; estate->es_result_relations = NULL; --- 178,184 ---- */ estate->es_direction = ForwardScanDirection; estate->es_snapshot = SnapshotNow; ! estate->es_crosscheck_snapshot = SnapshotAny; /* means no crosscheck */ estate->es_range_table = NIL; estate->es_result_relations= NULL; *** src/backend/executor/nodeSubplan.c.orig Thu Sep 25 14:58:35 2003 --- src/backend/executor/nodeSubplan.c Wed Oct 1 17:22:30 2003 *************** *** 709,715 **** sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10); sp_estate->es_snapshot = estate->es_snapshot; ! sp_estate->es_snapshot_cid = estate->es_snapshot_cid; sp_estate->es_instrument = estate->es_instrument; /* --- 709,715 ---- sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10); sp_estate->es_snapshot = estate->es_snapshot; ! sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; sp_estate->es_instrument = estate->es_instrument; /* *** src/backend/executor/nodeSubqueryscan.c.orig Thu Sep 25 14:58:35 2003 --- src/backend/executor/nodeSubqueryscan.c Wed Oct 1 17:22:31 2003 *************** *** 177,183 **** sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10); sp_estate->es_snapshot = estate->es_snapshot; ! sp_estate->es_snapshot_cid = estate->es_snapshot_cid; sp_estate->es_instrument = estate->es_instrument; /* --- 177,183 ---- sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10); sp_estate->es_snapshot = estate->es_snapshot; ! sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; sp_estate->es_instrument = estate->es_instrument; /* *** src/backend/executor/spi.c.orig Thu Sep 25 14:58:35 2003 --- src/backend/executor/spi.c Wed Oct 1 16:33:44 2003 *************** *** 33,43 **** static int _SPI_execute(const char *src, int tcount, _SPI_plan *plan); static int _SPI_pquery(QueryDesc*queryDesc, bool runit, ! bool useSnapshotNow, int tcount); static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, ! bool useSnapshotNow, int tcount); static void _SPI_cursor_operation(Portal portal, bool forward,int count, DestReceiver *dest); --- 33,43 ---- static int _SPI_execute(const char *src, int tcount, _SPI_plan *plan); static int _SPI_pquery(QueryDesc*queryDesc, bool runit, ! bool useCurrentSnapshot, int tcount); static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, ! bool useCurrentSnapshot, int tcount); static void _SPI_cursor_operation(Portal portal, boolforward, int count, DestReceiver *dest); *************** *** 245,256 **** } /* ! * SPI_execp_now -- identical to SPI_execp, except that we use SnapshotNow ! * instead of the normal QuerySnapshot. This is currently not documented ! * in spi.sgml because it is only intended for use by RI triggers. */ int ! SPI_execp_now(void *plan, Datum *Values, const char *Nulls, int tcount) { int res; --- 245,258 ---- } /* ! * SPI_execp_current -- identical to SPI_execp, except that we expose the ! * Executor option to use a current snapshot instead of the normal ! * QuerySnapshot. This is currently not documented in spi.sgml because ! * it is only intended for use by RI triggers. */ int ! SPI_execp_current(void *plan, Datum *Values, const char *Nulls, ! bool useCurrentSnapshot, int tcount) { int res; *************** *** 264,270 **** if (res < 0) return res; ! res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, true, tcount); _SPI_end_call(true); return res; --- 266,273 ---- if (res < 0) return res; ! res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, ! useCurrentSnapshot, tcount); _SPI_end_call(true); return res; *************** *** 1124,1130 **** static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, ! bool useSnapshotNow, int tcount) { List *query_list_list = plan->qtlist; List *plan_list= plan->ptlist; --- 1127,1133 ---- static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls, ! bool useCurrentSnapshot, int tcount) { List *query_list_list = plan->qtlist; List *plan_list = plan->ptlist; *************** *** 1195,1201 **** { qdesc = CreateQueryDesc(queryTree, planTree, dest, paramLI, false); ! res = _SPI_pquery(qdesc, true, useSnapshotNow, queryTree->canSetTag ?tcount : 0); if (res < 0) return res; --- 1198,1204 ---- { qdesc = CreateQueryDesc(queryTree, planTree, dest, paramLI, false); ! res = _SPI_pquery(qdesc, true, useCurrentSnapshot, queryTree->canSetTag? tcount : 0); if (res < 0) return res; *************** *** 1208,1214 **** } static int ! _SPI_pquery(QueryDesc *queryDesc, bool runit, bool useSnapshotNow, int tcount) { int operation = queryDesc->operation; int res; --- 1211,1218 ---- } static int ! _SPI_pquery(QueryDesc *queryDesc, bool runit, ! bool useCurrentSnapshot, int tcount) { int operation = queryDesc->operation; int res; *************** *** 1245,1251 **** ResetUsage(); #endif ! ExecutorStart(queryDesc, useSnapshotNow, false); ExecutorRun(queryDesc, ForwardScanDirection, (long) tcount); --- 1249,1255 ---- ResetUsage(); #endif ! ExecutorStart(queryDesc, useCurrentSnapshot, false); ExecutorRun(queryDesc, ForwardScanDirection, (long) tcount); *** src/backend/storage/ipc/sinval.c.orig Wed Sep 24 14:54:01 2003 --- src/backend/storage/ipc/sinval.c Wed Oct 1 16:02:42 2003 *************** *** 330,339 **** * lastBackend would be sufficient. But it seems better to do the * malloc while not holding thelock, so we can't look at lastBackend. * ! * if (snapshot->xip != NULL) no need to free and reallocate xip; ! * ! * We can reuse the old xip array, because MaxBackends does not change at ! * runtime. */ if (snapshot->xip == NULL) { --- 330,339 ---- * lastBackend would be sufficient. But it seems better to do the * malloc while not holding thelock, so we can't look at lastBackend. * ! * This does open a possibility for avoiding repeated malloc/free: ! * since MaxBackends does not change at runtime, we can simply reuse ! * the previous xip array if any. (This relies on the fact that all ! * calls pass static SnapshotData structs.) */ if (snapshot->xip == NULL) { *** src/backend/utils/adt/ri_triggers.c.orig Mon Sep 29 13:44:31 2003 --- src/backend/utils/adt/ri_triggers.c Wed Oct 1 16:49:40 2003 *************** *** 157,162 **** --- 157,163 ---- static bool ri_PerformCheck(RI_QueryKey *qkey, void *qplan, Relation fk_rel, Relation pk_rel, HeapTuple old_tuple, HeapTuple new_tuple, + bool detectNewRows, int expect_OK, const char *constrname); static void ri_ExtractValues(RI_QueryKey*qkey, int key_idx, Relation rel, HeapTuple tuple, *************** *** 276,281 **** --- 277,283 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, NULL,NULL, + false, SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 433,438 **** --- 435,441 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, NULL, new_row, + false, SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 594,599 **** --- 597,603 ---- result = ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* treat like update */ SPI_OK_SELECT, NULL); if (SPI_finish() != SPI_OK_FINISH) *************** *** 752,757 **** --- 756,762 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 942,947 **** --- 947,953 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 1102,1107 **** --- 1108,1114 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_DELETE, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 1285,1290 **** --- 1292,1298 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, new_row, + true, /* must detect new rows */ SPI_OK_UPDATE, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 1453,1458 **** --- 1461,1467 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 1633,1638 **** --- 1642,1648 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 1802,1807 **** --- 1812,1818 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_UPDATE, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 2019,2024 **** --- 2030,2036 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_UPDATE, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 2188,2193 **** --- 2200,2206 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_UPDATE, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 2392,2397 **** --- 2405,2411 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, old_row, NULL, + true, /* must detect new rows */ SPI_OK_UPDATE, tgargs[RI_CONSTRAINT_NAME_ARGNO]); *************** *** 2788,2798 **** --- 2802,2814 ---- ri_PerformCheck(RI_QueryKey *qkey, void *qplan, Relation fk_rel, Relation pk_rel, HeapTuple old_tuple, HeapTuple new_tuple, + bool detectNewRows, int expect_OK, const char *constrname) { Relation query_rel, source_rel; int key_idx; + bool useCurrentSnapshot; int limit; int spi_result; AclId save_uid; *************** *** 2842,2850 **** vals, nulls); } ! /* Switch to proper UID to perform check as */ ! save_uid = GetUserId(); ! SetUserId(RelationGetForm(query_rel)->relowner); /* * If this is a select query (e.g., for a 'no action'or 'restrict' --- 2858,2882 ---- vals, nulls); } ! /* ! * In READ COMMITTED mode, we just need to make sure the regular query ! * snapshot is up-to-date, and we will see all rows that could be ! * interesting. In SERIALIZABLE mode, we can't update the regular query ! * snapshot. If the caller passes detectNewRows == false then it's okay ! * to do the query with the transaction snapshot; otherwise we tell the ! * executor to force a current snapshot (and error out if it finds any ! * rows under current snapshot that wouldn't be visible per the ! * transaction snapshot). ! */ ! if (XactIsoLevel == XACT_SERIALIZABLE) ! { ! useCurrentSnapshot = detectNewRows; ! } ! else ! { ! SetQuerySnapshot(); ! useCurrentSnapshot = false; ! } /* * If this is a select query (e.g., for a 'no action' or 'restrict' *************** *** 2854,2872 **** */ limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0; ! /* ! * Run the plan, using SnapshotNow time qual rules so that we can see ! * all committed tuples, even those committed after our own transaction ! * or query started. ! */ ! spi_result = SPI_execp_now(qplan, vals, nulls, limit); /* Restore UID */ SetUserId(save_uid); /* Checkresult */ if (spi_result < 0) ! elog(ERROR, "SPI_execp_now returned %d", spi_result); if (expect_OK >= 0 && spi_result != expect_OK) ri_ReportViolation(qkey, constrname ? constrname : "", --- 2886,2905 ---- */ limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0; ! /* Switch to proper UID to perform check as */ ! save_uid = GetUserId(); ! SetUserId(RelationGetForm(query_rel)->relowner); ! ! /* Finally we can run the query. */ ! spi_result = SPI_execp_current(qplan, vals, nulls, ! useCurrentSnapshot, limit); /* Restore UID */ SetUserId(save_uid); /*Check result */ if (spi_result < 0) ! elog(ERROR, "SPI_execp_current returned %d", spi_result); if (expect_OK >= 0 && spi_result != expect_OK) ri_ReportViolation(qkey, constrname ? constrname : "", *** src/backend/utils/time/tqual.c.orig Thu Sep 25 14:58:35 2003 --- src/backend/utils/time/tqual.c Wed Oct 1 16:02:52 2003 *************** *** 26,39 **** #include "storage/sinval.h" #include "utils/tqual.h" ! ! static SnapshotData SnapshotDirtyData; ! Snapshot SnapshotDirty = &SnapshotDirtyData; ! static SnapshotData QuerySnapshotData; static SnapshotData SerializableSnapshotData; Snapshot QuerySnapshot = NULL;Snapshot SerializableSnapshot = NULL; /* These are updated by GetSnapshotData: */ TransactionId RecentXmin = InvalidTransactionId; --- 26,44 ---- #include "storage/sinval.h" #include "utils/tqual.h" ! /* ! * The SnapshotData structs are static to simplify memory allocation ! * (see the hack in GetSnapshotData to avoid repeated malloc/free). ! */ static SnapshotData QuerySnapshotData; static SnapshotData SerializableSnapshotData; + static SnapshotData CurrentSnapshotData; + static SnapshotData SnapshotDirtyData; + + /* Externally visible pointers to valid snapshots: */ Snapshot QuerySnapshot = NULL; Snapshot SerializableSnapshot= NULL; + Snapshot SnapshotDirty = &SnapshotDirtyData; /* These are updated by GetSnapshotData: */ TransactionId RecentXmin =InvalidTransactionId; *************** *** 387,396 **** * CurrentCommandId. */ int ! HeapTupleSatisfiesUpdate(HeapTuple htuple, CommandId curcid) { - HeapTupleHeader tuple = htuple->t_data; - if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED)) { if (tuple->t_infomask & HEAP_XMIN_INVALID) --- 392,399 ---- * CurrentCommandId. */ int ! HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid) { if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED)) { if (tuple->t_infomask & HEAP_XMIN_INVALID) *************** *** 1024,1029 **** --- 1027,1068 ---- } /* + * CopyCurrentSnapshot + * Make a snapshot that is up-to-date as of the current instant, + * and return a copy. + * + * The copy is palloc'd in the current memory context. + */ + Snapshot + CopyCurrentSnapshot(void) + { + Snapshot currentSnapshot; + Snapshot snapshot; + + if (QuerySnapshot == NULL) /* should not be first call in xact */ + elog(ERROR, "no snapshot has been set"); + + /* Update the static struct */ + currentSnapshot = GetSnapshotData(&CurrentSnapshotData, false); + currentSnapshot->curcid = GetCurrentCommandId(); + + /* Make a copy */ + snapshot = (Snapshot) palloc(sizeof(SnapshotData)); + memcpy(snapshot, currentSnapshot, sizeof(SnapshotData)); + if (snapshot->xcnt > 0) + { + snapshot->xip = (TransactionId *) + palloc(snapshot->xcnt * sizeof(TransactionId)); + memcpy(snapshot->xip, currentSnapshot->xip, + snapshot->xcnt * sizeof(TransactionId)); + } + else + snapshot->xip = NULL; + + return snapshot; + } + + /* * FreeXactSnapshot * Free snapshot(s) at end of transaction. */ *************** *** 1031,1038 **** FreeXactSnapshot(void) { /* ! * We do not free(QuerySnapshot->xip); or ! * free(SerializableSnapshot->xip); they will be reused soon */ QuerySnapshot = NULL; SerializableSnapshot= NULL; --- 1070,1078 ---- FreeXactSnapshot(void) { /* ! * We do not free the xip arrays for the snapshot structs; ! * they will be reused soon. So this is now just a state ! * change to prevent outside callers from accessing the snapshots. */ QuerySnapshot = NULL; SerializableSnapshot= NULL; *** src/include/access/heapam.h.orig Mon Sep 15 19:33:43 2003 --- src/include/access/heapam.h Wed Oct 1 15:35:11 2003 *************** *** 155,163 **** extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid); extern int heap_delete(Relationrelation, ItemPointer tid, ItemPointer ctid, ! CommandId cid, bool wait); extern int heap_update(Relation relation, ItemPointer otid, HeapTuple tup, ! ItemPointer ctid, CommandId cid, bool wait); extern int heap_mark4update(Relation relation, HeapTuple tup, Buffer *userbuf, CommandId cid); --- 155,163 ---- extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid); extern int heap_delete(Relationrelation, ItemPointer tid, ItemPointer ctid, ! CommandId cid, Snapshot crosscheck, bool wait); extern int heap_update(Relation relation, ItemPointer otid,HeapTuple tup, ! ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait); extern int heap_mark4update(Relation relation,HeapTuple tup, Buffer *userbuf, CommandId cid); *** src/include/executor/executor.h.orig Thu Sep 25 14:58:35 2003 --- src/include/executor/executor.h Wed Oct 1 15:35:05 2003 *************** *** 85,91 **** /* * prototypes from functions in execMain.c */ ! extern void ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly); externTupleTableSlot *ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); --- 85,91 ---- /* * prototypes from functions in execMain.c */ ! extern void ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot, bool explainOnly); externTupleTableSlot *ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); *** src/include/executor/spi.h.orig Thu Sep 25 14:58:36 2003 --- src/include/executor/spi.h Wed Oct 1 16:33:39 2003 *************** *** 84,91 **** extern int SPI_exec(const char *src, int tcount); extern int SPI_execp(void *plan, Datum *values, constchar *Nulls, int tcount); ! extern int SPI_execp_now(void *plan, Datum *values, const char *Nulls, ! int tcount); extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes); extern void *SPI_saveplan(void*plan); extern int SPI_freeplan(void *plan); --- 84,91 ---- extern int SPI_exec(const char *src, int tcount); extern int SPI_execp(void *plan, Datum *values, constchar *Nulls, int tcount); ! extern int SPI_execp_current(void *plan, Datum *values, const char *Nulls, ! bool useCurrentSnapshot, int tcount); extern void *SPI_prepare(const char *src, int nargs,Oid *argtypes); extern void *SPI_saveplan(void *plan); extern int SPI_freeplan(void *plan); *** src/include/nodes/execnodes.h.orig Thu Sep 25 14:58:36 2003 --- src/include/nodes/execnodes.h Wed Oct 1 15:35:00 2003 *************** *** 286,292 **** /* Basic state for all query types: */ ScanDirection es_direction; /* current scan direction */ Snapshot es_snapshot; /* time qual to use */ ! CommandId es_snapshot_cid; /* CommandId component of time qual */ List *es_range_table; /* List ofRangeTableEntrys */ /* Info about target table for insert/update/delete queries: */ --- 286,292 ---- /* Basic state for all query types: */ ScanDirection es_direction; /* current scan direction */ Snapshot es_snapshot; /* time qual to use */ ! Snapshot es_crosscheck_snapshot; /* crosscheck time qual for RI */ List *es_range_table; /* List ofRangeTableEntrys */ /* Info about target table for insert/update/delete queries: */ *** src/include/utils/tqual.h.orig Thu Sep 25 14:58:36 2003 --- src/include/utils/tqual.h Wed Oct 1 16:02:21 2003 *************** *** 100,106 **** extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple); extern bool HeapTupleSatisfiesSnapshot(HeapTupleHeadertuple, Snapshot snapshot); ! extern int HeapTupleSatisfiesUpdate(HeapTuple tuple, CommandId curcid); extern HTSV_Result HeapTupleSatisfiesVacuum(HeapTupleHeadertuple, TransactionId OldestXmin); --- 100,106 ---- extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple); extern bool HeapTupleSatisfiesSnapshot(HeapTupleHeadertuple, Snapshot snapshot); ! extern int HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid); extern HTSV_ResultHeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin); *************** *** 108,113 **** --- 108,114 ---- extern Snapshot GetSnapshotData(Snapshot snapshot, bool serializable); extern void SetQuerySnapshot(void);extern Snapshot CopyQuerySnapshot(void); + extern Snapshot CopyCurrentSnapshot(void); extern void FreeXactSnapshot(void); #endif /* TQUAL_H */
pgsql-hackers by date: