Re: invalid tid errors in latest 7.3.4 stable. - Mailing list pgsql-hackers
| From | Tom Lane |
|---|---|
| Subject | Re: invalid tid errors in latest 7.3.4 stable. |
| Date | |
| Msg-id | 9129.1065044188@sss.pgh.pa.us Whole thread Raw |
| In response to | invalid tid errors in latest 7.3.4 stable. (Wade Klaver <archeron@wavefire.com>) |
| Responses |
Re: invalid tid errors in latest 7.3.4 stable.
|
| List | pgsql-hackers |
Stephan Szabo <sszabo@megazone.bigpanda.com> writes:
> On Tue, 30 Sep 2003, Tom Lane wrote:
>> I think I can implement it and it will act as stated in my proposal.
>> Whether people like the proposed behavior is the big question in my
>> mind.
> I think it's more reasonable than the current behavior or any of
> the others we've hit along the way, and we have to pretty much choose
> now if we want to change it for 7.4.
I've committed the attached patch. One thing I wanted to double-check
with you is that the SELECT FOR UPDATES done in the noaction cases are
being correctly handled. I think it is correct to do them with the
current snapshot rather than the start-of-transaction snap; do you
agree? Also, I did not propagate the crosscheck support into
heap_mark4update, meaning that these SELECT FOR UPDATEs won't complain
if they find a row that was inserted later than the start of the
serializable transaction. I'm not totally sure if they should or not;
what do you think?
regards, tom lane
*** src/backend/access/heap/heapam.c.orig Thu Sep 25 10:22:54 2003
--- src/backend/access/heap/heapam.c Wed Oct 1 16:02:27 2003
***************
*** 1207,1220 **** * NB: do not call this directly unless you are prepared to deal with * concurrent-update
conditions. Use simple_heap_delete instead. * * Normal, successful return value is HeapTupleMayBeUpdated, which *
actuallymeans we did delete it. Failure return codes are * HeapTupleSelfUpdated, HeapTupleUpdated, or
HeapTupleBeingUpdated
! * (the last only possible if wait == false). */ int heap_delete(Relation relation, ItemPointer tid,
! ItemPointer ctid, CommandId cid, bool wait) { ItemId lp; HeapTupleData tp;
--- 1207,1229 ---- * NB: do not call this directly unless you are prepared to deal with * concurrent-update
conditions. Use simple_heap_delete instead. *
+ * relation - table to be modified
+ * tid - TID of tuple to be deleted
+ * ctid - output parameter, used only for failure case (see below)
+ * cid - delete command ID to use in verifying tuple visibility
+ * crosscheck - if not SnapshotAny, also check tuple against this
+ * wait - true if should wait for any conflicting update to commit/abort
+ * * Normal, successful return value is HeapTupleMayBeUpdated, which * actually means we did delete it. Failure
returncodes are * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
! * (the last only possible if wait == false). On a failure return,
! * *ctid is set to the ctid link of the target tuple (possibly a later
! * version of the row). */ int heap_delete(Relation relation, ItemPointer tid,
! ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { ItemId lp;
HeapTupleDatatp;
***************
*** 1240,1246 **** tp.t_tableOid = relation->rd_id; l1:
! result = HeapTupleSatisfiesUpdate(&tp, cid); if (result == HeapTupleInvisible) {
--- 1249,1255 ---- tp.t_tableOid = relation->rd_id; l1:
! result = HeapTupleSatisfiesUpdate(tp.t_data, cid); if (result == HeapTupleInvisible) {
***************
*** 1278,1283 ****
--- 1287,1300 ---- else result = HeapTupleUpdated; }
+
+ if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated)
+ {
+ /* Perform additional check for serializable RI updates */
+ if (!HeapTupleSatisfiesSnapshot(tp.t_data, crosscheck))
+ result = HeapTupleUpdated;
+ }
+ if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated ||
***************
*** 1378,1384 **** result = heap_delete(relation, tid, &ctid,
! GetCurrentCommandId(), true /* wait for commit */); switch
(result) {
--- 1395,1401 ---- result = heap_delete(relation, tid, &ctid,
! GetCurrentCommandId(), SnapshotAny, true /* wait for commit */);
switch (result) {
***************
*** 1407,1420 **** * NB: do not call this directly unless you are prepared to deal with * concurrent-update
conditions. Use simple_heap_update instead. * * Normal, successful return value is HeapTupleMayBeUpdated, which *
actuallymeans we *did* update it. Failure return codes are * HeapTupleSelfUpdated, HeapTupleUpdated, or
HeapTupleBeingUpdated
! * (the last only possible if wait == false). */ int heap_update(Relation relation, ItemPointer otid, HeapTuple
newtup,
! ItemPointer ctid, CommandId cid, bool wait) { ItemId lp; HeapTupleData oldtup;
--- 1424,1449 ---- * NB: do not call this directly unless you are prepared to deal with * concurrent-update
conditions. Use simple_heap_update instead. *
+ * relation - table to be modified
+ * otid - TID of old tuple to be replaced
+ * newtup - newly constructed tuple data to store
+ * ctid - output parameter, used only for failure case (see below)
+ * cid - update command ID to use in verifying old tuple visibility
+ * crosscheck - if not SnapshotAny, also check old tuple against this
+ * wait - true if should wait for any conflicting update to commit/abort
+ * * Normal, successful return value is HeapTupleMayBeUpdated, which * actually means we *did* update it. Failure
returncodes are * HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
! * (the last only possible if wait == false). On a failure return,
! * *ctid is set to the ctid link of the old tuple (possibly a later
! * version of the row).
! * On success, newtup->t_self is set to the TID where the new tuple
! * was inserted. */ int heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
! ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { ItemId lp;
HeapTupleDataoldtup;
***************
*** 1450,1456 **** */ l2:
! result = HeapTupleSatisfiesUpdate(&oldtup, cid); if (result == HeapTupleInvisible) {
--- 1479,1485 ---- */ l2:
! result = HeapTupleSatisfiesUpdate(oldtup.t_data, cid); if (result == HeapTupleInvisible) {
***************
*** 1488,1493 ****
--- 1517,1530 ---- else result = HeapTupleUpdated; }
+
+ if (crosscheck != SnapshotAny && result == HeapTupleMayBeUpdated)
+ {
+ /* Perform additional check for serializable RI updates */
+ if (!HeapTupleSatisfiesSnapshot(oldtup.t_data, crosscheck))
+ result = HeapTupleUpdated;
+ }
+ if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated ||
***************
*** 1718,1724 **** result = heap_update(relation, otid, tup, &ctid,
! GetCurrentCommandId(), true /* wait for commit */); switch
(result) {
--- 1755,1761 ---- result = heap_update(relation, otid, tup, &ctid,
! GetCurrentCommandId(), SnapshotAny, true /* wait for commit */);
switch (result) {
***************
*** 1767,1773 **** tuple->t_len = ItemIdGetLength(lp); l3:
! result = HeapTupleSatisfiesUpdate(tuple, cid); if (result == HeapTupleInvisible) {
--- 1804,1810 ---- tuple->t_len = ItemIdGetLength(lp); l3:
! result = HeapTupleSatisfiesUpdate(tuple->t_data, cid); if (result == HeapTupleInvisible) {
*** src/backend/commands/async.c.orig Mon Sep 15 19:33:39 2003
--- src/backend/commands/async.c Wed Oct 1 15:36:02 2003
***************
*** 537,543 **** */ result = heap_update(lRel, &lTuple->t_self, rTuple,
&ctid,
! GetCurrentCommandId(), false /* no wait for
commit*/); switch (result) {
--- 537,543 ---- */ result = heap_update(lRel, &lTuple->t_self, rTuple,
&ctid,
! GetCurrentCommandId(), SnapshotAny, false
/*no wait for commit */); switch (result) {
*** src/backend/executor/execMain.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/executor/execMain.c Wed Oct 1 17:22:30 2003
***************
*** 104,111 **** * field of the QueryDesc is filled in to describe the tuples that will be * returned, and the
internalfields (estate and planstate) are set up. *
! * If useSnapshotNow is true, run the query with SnapshotNow time qual rules
! * instead of the normal use of QuerySnapshot. * * If explainOnly is true, we are not actually intending to run the
plan, * only to set up for EXPLAIN; so skip unwanted side-effects.
--- 104,117 ---- * field of the QueryDesc is filled in to describe the tuples that will be * returned, and the
internalfields (estate and planstate) are set up. *
! * If useCurrentSnapshot is true, run the query with the latest available
! * snapshot, instead of the normal QuerySnapshot. Also, if it's an update
! * or delete query, check that the rows to be updated or deleted would be
! * visible to the normal QuerySnapshot. (This is a special-case behavior
! * needed for referential integrity updates in serializable transactions.
! * We must check all currently-committed rows, but we want to throw a
! * can't-serialize error if any rows that would need updates would not be
! * visible under the normal serializable snapshot.) * * If explainOnly is true, we are not actually intending to
runthe plan, * only to set up for EXPLAIN; so skip unwanted side-effects.
***************
*** 115,121 **** * ---------------------------------------------------------------- */ void
! ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly) { EState *estate;
MemoryContextoldcontext;
--- 121,127 ---- * ---------------------------------------------------------------- */ void
! ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot, bool explainOnly) { EState *estate;
MemoryContextoldcontext;
***************
*** 157,171 **** * the life of this query, even if it outlives the current command and * current snapshot.
*/
! if (useSnapshotNow) {
! estate->es_snapshot = SnapshotNow;
! estate->es_snapshot_cid = GetCurrentCommandId(); } else { estate->es_snapshot =
CopyQuerySnapshot();
! estate->es_snapshot_cid = estate->es_snapshot->curcid; } /*
--- 163,180 ---- * the life of this query, even if it outlives the current command and * current snapshot.
*/
! if (useCurrentSnapshot) {
! /* RI update/delete query --- must use an up-to-date snapshot */
! estate->es_snapshot = CopyCurrentSnapshot();
! /* crosscheck updates/deletes against transaction snapshot */
! estate->es_crosscheck_snapshot = CopyQuerySnapshot(); } else {
+ /* normal query --- use query snapshot, no crosscheck */ estate->es_snapshot = CopyQuerySnapshot();
! estate->es_crosscheck_snapshot = SnapshotAny; } /*
***************
*** 1118,1124 **** tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
test= heap_mark4update(erm->relation, &tuple, &buffer,
! estate->es_snapshot_cid); ReleaseBuffer(buffer);
switch (test) {
--- 1127,1133 ---- tuple.t_self = *((ItemPointer) DatumGetPointer(datum));
test= heap_mark4update(erm->relation, &tuple, &buffer,
! estate->es_snapshot->curcid); ReleaseBuffer(buffer);
switch (test) {
***************
*** 1278,1284 **** if (estate->es_into_relation_descriptor != NULL) {
heap_insert(estate->es_into_relation_descriptor,tuple,
! estate->es_snapshot_cid); IncrAppended(); }
--- 1287,1293 ---- if (estate->es_into_relation_descriptor != NULL) {
heap_insert(estate->es_into_relation_descriptor,tuple,
! estate->es_snapshot->curcid); IncrAppended(); }
***************
*** 1354,1360 **** * insert the tuple */ newId = heap_insert(resultRelationDesc, tuple,
! estate->es_snapshot_cid); IncrAppended(); (estate->es_processed)++;
--- 1363,1369 ---- * insert the tuple */ newId = heap_insert(resultRelationDesc, tuple,
! estate->es_snapshot->curcid); IncrAppended(); (estate->es_processed)++;
***************
*** 1406,1412 **** bool dodelete; dodelete = ExecBRDeleteTriggers(estate, resultRelInfo,
tupleid,
! estate->es_snapshot_cid); if (!dodelete) /* "do nothing"
*/ return;
--- 1415,1421 ---- bool dodelete; dodelete = ExecBRDeleteTriggers(estate, resultRelInfo,
tupleid,
! estate->es_snapshot->curcid); if (!dodelete) /* "do
nothing"*/ return;
***************
*** 1418,1424 **** ldelete:; result = heap_delete(resultRelationDesc, tupleid, &ctid,
! estate->es_snapshot_cid, true /* wait for commit */); switch
(result) {
--- 1427,1434 ---- ldelete:; result = heap_delete(resultRelationDesc, tupleid, &ctid,
! estate->es_snapshot->curcid,
! estate->es_crosscheck_snapshot, true /* wait for commit */);
switch(result) {
***************
*** 1517,1523 **** newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
tupleid, tuple,
! estate->es_snapshot_cid); if (newtuple == NULL) /* "do nothing"
*/ return;
--- 1527,1533 ---- newtuple = ExecBRUpdateTriggers(estate, resultRelInfo,
tupleid, tuple,
! estate->es_snapshot->curcid); if (newtuple == NULL) /* "do
nothing"*/ return;
***************
*** 1553,1559 **** */ result = heap_update(resultRelationDesc, tupleid, tuple,
&ctid,
! estate->es_snapshot_cid, true /* wait for commit */); switch
(result) {
--- 1563,1570 ---- */ result = heap_update(resultRelationDesc, tupleid, tuple,
&ctid,
! estate->es_snapshot->curcid,
! estate->es_crosscheck_snapshot, true /* wait for commit */);
switch(result) {
***************
*** 2039,2045 **** */ epqstate->es_direction = ForwardScanDirection; epqstate->es_snapshot =
estate->es_snapshot;
! epqstate->es_snapshot_cid = estate->es_snapshot_cid; epqstate->es_range_table = estate->es_range_table;
epqstate->es_result_relations= estate->es_result_relations; epqstate->es_num_result_relations =
estate->es_num_result_relations;
--- 2050,2056 ---- */ epqstate->es_direction = ForwardScanDirection; epqstate->es_snapshot =
estate->es_snapshot;
! epqstate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; epqstate->es_range_table =
estate->es_range_table; epqstate->es_result_relations = estate->es_result_relations;
epqstate->es_num_result_relations= estate->es_num_result_relations;
*** src/backend/executor/execUtils.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/executor/execUtils.c Wed Oct 1 15:35:55 2003
***************
*** 178,184 **** */ estate->es_direction = ForwardScanDirection; estate->es_snapshot = SnapshotNow;
! estate->es_snapshot_cid = FirstCommandId; estate->es_range_table = NIL; estate->es_result_relations =
NULL;
--- 178,184 ---- */ estate->es_direction = ForwardScanDirection; estate->es_snapshot = SnapshotNow;
! estate->es_crosscheck_snapshot = SnapshotAny; /* means no crosscheck */ estate->es_range_table = NIL;
estate->es_result_relations= NULL;
*** src/backend/executor/nodeSubplan.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/executor/nodeSubplan.c Wed Oct 1 17:22:30 2003
***************
*** 709,715 **** sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
sp_estate->es_snapshot = estate->es_snapshot;
! sp_estate->es_snapshot_cid = estate->es_snapshot_cid; sp_estate->es_instrument = estate->es_instrument;
/*
--- 709,715 ---- sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(subplan->plan) + 10);
sp_estate->es_snapshot = estate->es_snapshot;
! sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; sp_estate->es_instrument =
estate->es_instrument; /*
*** src/backend/executor/nodeSubqueryscan.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/executor/nodeSubqueryscan.c Wed Oct 1 17:22:31 2003
***************
*** 177,183 **** sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
sp_estate->es_snapshot = estate->es_snapshot;
! sp_estate->es_snapshot_cid = estate->es_snapshot_cid; sp_estate->es_instrument = estate->es_instrument;
/*
--- 177,183 ---- sp_estate->es_tupleTable = ExecCreateTupleTable(ExecCountSlotsNode(node->subplan) + 10);
sp_estate->es_snapshot = estate->es_snapshot;
! sp_estate->es_crosscheck_snapshot = estate->es_crosscheck_snapshot; sp_estate->es_instrument =
estate->es_instrument; /*
*** src/backend/executor/spi.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/executor/spi.c Wed Oct 1 16:33:44 2003
***************
*** 33,43 **** static int _SPI_execute(const char *src, int tcount, _SPI_plan *plan); static int
_SPI_pquery(QueryDesc*queryDesc, bool runit,
! bool useSnapshotNow, int tcount); static int _SPI_execute_plan(_SPI_plan *plan,
Datum *Values, const char *Nulls,
! bool useSnapshotNow, int tcount); static void _SPI_cursor_operation(Portal portal, bool
forward,int count, DestReceiver *dest);
--- 33,43 ---- static int _SPI_execute(const char *src, int tcount, _SPI_plan *plan); static int
_SPI_pquery(QueryDesc*queryDesc, bool runit,
! bool useCurrentSnapshot, int tcount); static int _SPI_execute_plan(_SPI_plan *plan,
Datum *Values, const char *Nulls,
! bool useCurrentSnapshot, int tcount); static void _SPI_cursor_operation(Portal portal,
boolforward, int count, DestReceiver *dest);
***************
*** 245,256 **** } /*
! * SPI_execp_now -- identical to SPI_execp, except that we use SnapshotNow
! * instead of the normal QuerySnapshot. This is currently not documented
! * in spi.sgml because it is only intended for use by RI triggers. */ int
! SPI_execp_now(void *plan, Datum *Values, const char *Nulls, int tcount) { int res;
--- 245,258 ---- } /*
! * SPI_execp_current -- identical to SPI_execp, except that we expose the
! * Executor option to use a current snapshot instead of the normal
! * QuerySnapshot. This is currently not documented in spi.sgml because
! * it is only intended for use by RI triggers. */ int
! SPI_execp_current(void *plan, Datum *Values, const char *Nulls,
! bool useCurrentSnapshot, int tcount) { int res;
***************
*** 264,270 **** if (res < 0) return res;
! res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls, true, tcount); _SPI_end_call(true); return
res;
--- 266,273 ---- if (res < 0) return res;
! res = _SPI_execute_plan((_SPI_plan *) plan, Values, Nulls,
! useCurrentSnapshot, tcount); _SPI_end_call(true); return res;
***************
*** 1124,1130 **** static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
! bool useSnapshotNow, int tcount) { List *query_list_list = plan->qtlist; List
*plan_list= plan->ptlist;
--- 1127,1133 ---- static int _SPI_execute_plan(_SPI_plan *plan, Datum *Values, const char *Nulls,
! bool useCurrentSnapshot, int tcount) { List *query_list_list = plan->qtlist; List
*plan_list = plan->ptlist;
***************
*** 1195,1201 **** { qdesc = CreateQueryDesc(queryTree, planTree, dest,
paramLI, false);
! res = _SPI_pquery(qdesc, true, useSnapshotNow, queryTree->canSetTag
?tcount : 0); if (res < 0) return res;
--- 1198,1204 ---- { qdesc = CreateQueryDesc(queryTree, planTree, dest,
paramLI, false);
! res = _SPI_pquery(qdesc, true, useCurrentSnapshot,
queryTree->canSetTag? tcount : 0); if (res < 0) return res;
***************
*** 1208,1214 **** } static int
! _SPI_pquery(QueryDesc *queryDesc, bool runit, bool useSnapshotNow, int tcount) { int operation =
queryDesc->operation; int res;
--- 1211,1218 ---- } static int
! _SPI_pquery(QueryDesc *queryDesc, bool runit,
! bool useCurrentSnapshot, int tcount) { int operation = queryDesc->operation; int
res;
***************
*** 1245,1251 **** ResetUsage(); #endif
! ExecutorStart(queryDesc, useSnapshotNow, false); ExecutorRun(queryDesc, ForwardScanDirection, (long)
tcount);
--- 1249,1255 ---- ResetUsage(); #endif
! ExecutorStart(queryDesc, useCurrentSnapshot, false); ExecutorRun(queryDesc, ForwardScanDirection, (long)
tcount);
*** src/backend/storage/ipc/sinval.c.orig Wed Sep 24 14:54:01 2003
--- src/backend/storage/ipc/sinval.c Wed Oct 1 16:02:42 2003
***************
*** 330,339 **** * lastBackend would be sufficient. But it seems better to do the * malloc while not holding
thelock, so we can't look at lastBackend. *
! * if (snapshot->xip != NULL) no need to free and reallocate xip;
! *
! * We can reuse the old xip array, because MaxBackends does not change at
! * runtime. */ if (snapshot->xip == NULL) {
--- 330,339 ---- * lastBackend would be sufficient. But it seems better to do the * malloc while not holding
thelock, so we can't look at lastBackend. *
! * This does open a possibility for avoiding repeated malloc/free:
! * since MaxBackends does not change at runtime, we can simply reuse
! * the previous xip array if any. (This relies on the fact that all
! * calls pass static SnapshotData structs.) */ if (snapshot->xip == NULL) {
*** src/backend/utils/adt/ri_triggers.c.orig Mon Sep 29 13:44:31 2003
--- src/backend/utils/adt/ri_triggers.c Wed Oct 1 16:49:40 2003
***************
*** 157,162 ****
--- 157,163 ---- static bool ri_PerformCheck(RI_QueryKey *qkey, void *qplan, Relation fk_rel, Relation
pk_rel, HeapTuple old_tuple, HeapTuple new_tuple,
+ bool detectNewRows, int expect_OK, const char *constrname); static void
ri_ExtractValues(RI_QueryKey*qkey, int key_idx, Relation rel, HeapTuple tuple,
***************
*** 276,281 ****
--- 277,283 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
NULL,NULL,
+ false, SPI_OK_SELECT,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 433,438 ****
--- 435,441 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel, NULL,
new_row,
+ false, SPI_OK_SELECT, tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 594,599 ****
--- 597,603 ---- result = ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* treat like update */ SPI_OK_SELECT, NULL);
if (SPI_finish() != SPI_OK_FINISH)
***************
*** 752,757 ****
--- 756,762 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_SELECT,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 942,947 ****
--- 947,953 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_SELECT,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 1102,1107 ****
--- 1108,1114 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_DELETE,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 1285,1290 ****
--- 1292,1298 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, new_row,
+ true, /* must detect new rows */ SPI_OK_UPDATE,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 1453,1458 ****
--- 1461,1467 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_SELECT,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 1633,1638 ****
--- 1642,1648 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_SELECT,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 1802,1807 ****
--- 1812,1818 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_UPDATE,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 2019,2024 ****
--- 2030,2036 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_UPDATE,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 2188,2193 ****
--- 2200,2206 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_UPDATE,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 2392,2397 ****
--- 2405,2411 ---- ri_PerformCheck(&qkey, qplan, fk_rel, pk_rel,
old_row, NULL,
+ true, /* must detect new rows */ SPI_OK_UPDATE,
tgargs[RI_CONSTRAINT_NAME_ARGNO]);
***************
*** 2788,2798 ****
--- 2802,2814 ---- ri_PerformCheck(RI_QueryKey *qkey, void *qplan, Relation fk_rel, Relation pk_rel,
HeapTuple old_tuple, HeapTuple new_tuple,
+ bool detectNewRows, int expect_OK, const char *constrname) { Relation
query_rel, source_rel; int key_idx;
+ bool useCurrentSnapshot; int limit; int spi_result; AclId
save_uid;
***************
*** 2842,2850 **** vals, nulls); }
! /* Switch to proper UID to perform check as */
! save_uid = GetUserId();
! SetUserId(RelationGetForm(query_rel)->relowner); /* * If this is a select query (e.g., for a 'no
action'or 'restrict'
--- 2858,2882 ---- vals, nulls); }
! /*
! * In READ COMMITTED mode, we just need to make sure the regular query
! * snapshot is up-to-date, and we will see all rows that could be
! * interesting. In SERIALIZABLE mode, we can't update the regular query
! * snapshot. If the caller passes detectNewRows == false then it's okay
! * to do the query with the transaction snapshot; otherwise we tell the
! * executor to force a current snapshot (and error out if it finds any
! * rows under current snapshot that wouldn't be visible per the
! * transaction snapshot).
! */
! if (XactIsoLevel == XACT_SERIALIZABLE)
! {
! useCurrentSnapshot = detectNewRows;
! }
! else
! {
! SetQuerySnapshot();
! useCurrentSnapshot = false;
! } /* * If this is a select query (e.g., for a 'no action' or 'restrict'
***************
*** 2854,2872 **** */ limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
! /*
! * Run the plan, using SnapshotNow time qual rules so that we can see
! * all committed tuples, even those committed after our own transaction
! * or query started.
! */
! spi_result = SPI_execp_now(qplan, vals, nulls, limit); /* Restore UID */ SetUserId(save_uid); /*
Checkresult */ if (spi_result < 0)
! elog(ERROR, "SPI_execp_now returned %d", spi_result); if (expect_OK >= 0 && spi_result != expect_OK)
ri_ReportViolation(qkey, constrname ? constrname : "",
--- 2886,2905 ---- */ limit = (expect_OK == SPI_OK_SELECT) ? 1 : 0;
! /* Switch to proper UID to perform check as */
! save_uid = GetUserId();
! SetUserId(RelationGetForm(query_rel)->relowner);
!
! /* Finally we can run the query. */
! spi_result = SPI_execp_current(qplan, vals, nulls,
! useCurrentSnapshot, limit); /* Restore UID */ SetUserId(save_uid);
/*Check result */ if (spi_result < 0)
! elog(ERROR, "SPI_execp_current returned %d", spi_result); if (expect_OK >= 0 && spi_result != expect_OK)
ri_ReportViolation(qkey, constrname ? constrname : "",
*** src/backend/utils/time/tqual.c.orig Thu Sep 25 14:58:35 2003
--- src/backend/utils/time/tqual.c Wed Oct 1 16:02:52 2003
***************
*** 26,39 **** #include "storage/sinval.h" #include "utils/tqual.h"
!
! static SnapshotData SnapshotDirtyData;
! Snapshot SnapshotDirty = &SnapshotDirtyData;
! static SnapshotData QuerySnapshotData; static SnapshotData SerializableSnapshotData; Snapshot QuerySnapshot =
NULL;Snapshot SerializableSnapshot = NULL; /* These are updated by GetSnapshotData: */ TransactionId RecentXmin =
InvalidTransactionId;
--- 26,44 ---- #include "storage/sinval.h" #include "utils/tqual.h"
! /*
! * The SnapshotData structs are static to simplify memory allocation
! * (see the hack in GetSnapshotData to avoid repeated malloc/free).
! */ static SnapshotData QuerySnapshotData; static SnapshotData SerializableSnapshotData;
+ static SnapshotData CurrentSnapshotData;
+ static SnapshotData SnapshotDirtyData;
+
+ /* Externally visible pointers to valid snapshots: */ Snapshot QuerySnapshot = NULL; Snapshot
SerializableSnapshot= NULL;
+ Snapshot SnapshotDirty = &SnapshotDirtyData; /* These are updated by GetSnapshotData: */ TransactionId RecentXmin
=InvalidTransactionId;
***************
*** 387,396 **** * CurrentCommandId. */ int
! HeapTupleSatisfiesUpdate(HeapTuple htuple, CommandId curcid) {
- HeapTupleHeader tuple = htuple->t_data;
- if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED)) { if (tuple->t_infomask & HEAP_XMIN_INVALID)
--- 392,399 ---- * CurrentCommandId. */ int
! HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid) { if (!(tuple->t_infomask &
HEAP_XMIN_COMMITTED)) { if (tuple->t_infomask & HEAP_XMIN_INVALID)
***************
*** 1024,1029 ****
--- 1027,1068 ---- } /*
+ * CopyCurrentSnapshot
+ * Make a snapshot that is up-to-date as of the current instant,
+ * and return a copy.
+ *
+ * The copy is palloc'd in the current memory context.
+ */
+ Snapshot
+ CopyCurrentSnapshot(void)
+ {
+ Snapshot currentSnapshot;
+ Snapshot snapshot;
+
+ if (QuerySnapshot == NULL) /* should not be first call in xact */
+ elog(ERROR, "no snapshot has been set");
+
+ /* Update the static struct */
+ currentSnapshot = GetSnapshotData(&CurrentSnapshotData, false);
+ currentSnapshot->curcid = GetCurrentCommandId();
+
+ /* Make a copy */
+ snapshot = (Snapshot) palloc(sizeof(SnapshotData));
+ memcpy(snapshot, currentSnapshot, sizeof(SnapshotData));
+ if (snapshot->xcnt > 0)
+ {
+ snapshot->xip = (TransactionId *)
+ palloc(snapshot->xcnt * sizeof(TransactionId));
+ memcpy(snapshot->xip, currentSnapshot->xip,
+ snapshot->xcnt * sizeof(TransactionId));
+ }
+ else
+ snapshot->xip = NULL;
+
+ return snapshot;
+ }
+
+ /* * FreeXactSnapshot * Free snapshot(s) at end of transaction. */
***************
*** 1031,1038 **** FreeXactSnapshot(void) { /*
! * We do not free(QuerySnapshot->xip); or
! * free(SerializableSnapshot->xip); they will be reused soon */ QuerySnapshot = NULL;
SerializableSnapshot= NULL;
--- 1070,1078 ---- FreeXactSnapshot(void) { /*
! * We do not free the xip arrays for the snapshot structs;
! * they will be reused soon. So this is now just a state
! * change to prevent outside callers from accessing the snapshots. */ QuerySnapshot = NULL;
SerializableSnapshot= NULL;
*** src/include/access/heapam.h.orig Mon Sep 15 19:33:43 2003
--- src/include/access/heapam.h Wed Oct 1 15:35:11 2003
***************
*** 155,163 **** extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid); extern int
heap_delete(Relationrelation, ItemPointer tid, ItemPointer ctid,
! CommandId cid, bool wait); extern int heap_update(Relation relation, ItemPointer otid, HeapTuple tup,
! ItemPointer ctid, CommandId cid, bool wait); extern int heap_mark4update(Relation relation, HeapTuple
tup, Buffer *userbuf, CommandId cid);
--- 155,163 ---- extern Oid heap_insert(Relation relation, HeapTuple tup, CommandId cid); extern int
heap_delete(Relationrelation, ItemPointer tid, ItemPointer ctid,
! CommandId cid, Snapshot crosscheck, bool wait); extern int heap_update(Relation relation, ItemPointer
otid,HeapTuple tup,
! ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait); extern int heap_mark4update(Relation
relation,HeapTuple tup, Buffer *userbuf, CommandId cid);
*** src/include/executor/executor.h.orig Thu Sep 25 14:58:35 2003
--- src/include/executor/executor.h Wed Oct 1 15:35:05 2003
***************
*** 85,91 **** /* * prototypes from functions in execMain.c */
! extern void ExecutorStart(QueryDesc *queryDesc, bool useSnapshotNow, bool explainOnly);
externTupleTableSlot *ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
--- 85,91 ---- /* * prototypes from functions in execMain.c */
! extern void ExecutorStart(QueryDesc *queryDesc, bool useCurrentSnapshot, bool explainOnly);
externTupleTableSlot *ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count);
*** src/include/executor/spi.h.orig Thu Sep 25 14:58:36 2003
--- src/include/executor/spi.h Wed Oct 1 16:33:39 2003
***************
*** 84,91 **** extern int SPI_exec(const char *src, int tcount); extern int SPI_execp(void *plan, Datum *values,
constchar *Nulls, int tcount);
! extern int SPI_execp_now(void *plan, Datum *values, const char *Nulls,
! int tcount); extern void *SPI_prepare(const char *src, int nargs, Oid *argtypes); extern void
*SPI_saveplan(void*plan); extern int SPI_freeplan(void *plan);
--- 84,91 ---- extern int SPI_exec(const char *src, int tcount); extern int SPI_execp(void *plan, Datum *values,
constchar *Nulls, int tcount);
! extern int SPI_execp_current(void *plan, Datum *values, const char *Nulls,
! bool useCurrentSnapshot, int tcount); extern void *SPI_prepare(const char *src, int
nargs,Oid *argtypes); extern void *SPI_saveplan(void *plan); extern int SPI_freeplan(void *plan);
*** src/include/nodes/execnodes.h.orig Thu Sep 25 14:58:36 2003
--- src/include/nodes/execnodes.h Wed Oct 1 15:35:00 2003
***************
*** 286,292 **** /* Basic state for all query types: */ ScanDirection es_direction; /* current scan direction
*/ Snapshot es_snapshot; /* time qual to use */
! CommandId es_snapshot_cid; /* CommandId component of time qual */ List *es_range_table; /* List
ofRangeTableEntrys */ /* Info about target table for insert/update/delete queries: */
--- 286,292 ---- /* Basic state for all query types: */ ScanDirection es_direction; /* current scan direction
*/ Snapshot es_snapshot; /* time qual to use */
! Snapshot es_crosscheck_snapshot; /* crosscheck time qual for RI */ List *es_range_table; /* List
ofRangeTableEntrys */ /* Info about target table for insert/update/delete queries: */
*** src/include/utils/tqual.h.orig Thu Sep 25 14:58:36 2003
--- src/include/utils/tqual.h Wed Oct 1 16:02:21 2003
***************
*** 100,106 **** extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple); extern bool
HeapTupleSatisfiesSnapshot(HeapTupleHeadertuple, Snapshot snapshot);
! extern int HeapTupleSatisfiesUpdate(HeapTuple tuple, CommandId curcid); extern HTSV_Result
HeapTupleSatisfiesVacuum(HeapTupleHeadertuple, TransactionId OldestXmin);
--- 100,106 ---- extern bool HeapTupleSatisfiesToast(HeapTupleHeader tuple); extern bool
HeapTupleSatisfiesSnapshot(HeapTupleHeadertuple, Snapshot snapshot);
! extern int HeapTupleSatisfiesUpdate(HeapTupleHeader tuple, CommandId curcid); extern
HTSV_ResultHeapTupleSatisfiesVacuum(HeapTupleHeader tuple, TransactionId OldestXmin);
***************
*** 108,113 ****
--- 108,114 ---- extern Snapshot GetSnapshotData(Snapshot snapshot, bool serializable); extern void
SetQuerySnapshot(void);extern Snapshot CopyQuerySnapshot(void);
+ extern Snapshot CopyCurrentSnapshot(void); extern void FreeXactSnapshot(void); #endif /* TQUAL_H */
pgsql-hackers by date: