diff --git a/contrib/amcheck/verify_nbtree.c b/contrib/amcheck/verify_nbtree.c index 28ea2f211b..ed7caf662d 100644 --- a/contrib/amcheck/verify_nbtree.c +++ b/contrib/amcheck/verify_nbtree.c @@ -617,7 +617,7 @@ bt_check_level_from_leftmost(BtreeCheckState *state, BtreeLevel level) /* Internal page -- downlink gets leftmost on next level */ itemid = PageGetItemId(state->target, P_FIRSTDATAKEY(opaque)); itup = (IndexTuple) PageGetItem(state->target, itemid); - nextleveldown.leftmost = ItemPointerGetBlockNumber(&(itup->t_tid)); + nextleveldown.leftmost = ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)); nextleveldown.level = opaque->btpo.level - 1; } else @@ -722,6 +722,39 @@ bt_target_page_check(BtreeCheckState *state) elog(DEBUG2, "verifying %u items on %s block %u", max, P_ISLEAF(topaque) ? "leaf" : "internal", state->targetblock); + + /* Check the number of attributes in high key if any */ + if (!P_RIGHTMOST(topaque)) + { + if (!_bt_check_natts(state->rel, state->target, P_HIKEY)) + { + ItemId itemid; + IndexTuple itup; + char *itid, + *htid; + + itemid = PageGetItemId(state->target, P_HIKEY); + itup = (IndexTuple) PageGetItem(state->target, itemid); + itid = psprintf("(%u,%u)", state->targetblock, P_HIKEY); + htid = psprintf("(%u,%u)", + ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), + ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid))); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("wrong number of index tuple attributes for index \"%s\"", + RelationGetRelationName(state->rel)), + errdetail_internal("Index tid=%s natts=%u points to %s tid=%s page lsn=%X/%X.", + itid, + BTreeTupGetNAtts(itup, state->rel), + P_ISLEAF(topaque) ? "heap" : "index", + htid, + (uint32) (state->targetlsn >> 32), + (uint32) state->targetlsn))); + } + } + + /* * Loop over page items, starting from first non-highkey item, not high * key (if any). Also, immediately skip "negative infinity" real item (if @@ -760,6 +793,30 @@ bt_target_page_check(BtreeCheckState *state) (uint32) state->targetlsn), errhint("This could be a torn page problem"))); + /* Check the number of index tuple attributes */ + if (!_bt_check_natts(state->rel, state->target, offset)) + { + char *itid, + *htid; + + itid = psprintf("(%u,%u)", state->targetblock, offset); + htid = psprintf("(%u,%u)", + ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), + ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid))); + + ereport(ERROR, + (errcode(ERRCODE_INDEX_CORRUPTED), + errmsg("wrong number of index tuple attributes for index \"%s\"", + RelationGetRelationName(state->rel)), + errdetail_internal("Index tid=%s natts=%u points to %s tid=%s page lsn=%X/%X.", + itid, + BTreeTupGetNAtts(itup, state->rel), + P_ISLEAF(topaque) ? "heap" : "index", + htid, + (uint32) (state->targetlsn >> 32), + (uint32) state->targetlsn))); + } + /* * Don't try to generate scankey using "negative infinity" garbage * data on internal pages @@ -802,8 +859,8 @@ bt_target_page_check(BtreeCheckState *state) itid = psprintf("(%u,%u)", state->targetblock, offset); htid = psprintf("(%u,%u)", - ItemPointerGetBlockNumber(&(itup->t_tid)), - ItemPointerGetOffsetNumber(&(itup->t_tid))); + ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), + ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid))); ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), @@ -834,8 +891,8 @@ bt_target_page_check(BtreeCheckState *state) itid = psprintf("(%u,%u)", state->targetblock, offset); htid = psprintf("(%u,%u)", - ItemPointerGetBlockNumber(&(itup->t_tid)), - ItemPointerGetOffsetNumber(&(itup->t_tid))); + ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), + ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid))); nitid = psprintf("(%u,%u)", state->targetblock, OffsetNumberNext(offset)); @@ -843,8 +900,8 @@ bt_target_page_check(BtreeCheckState *state) itemid = PageGetItemId(state->target, OffsetNumberNext(offset)); itup = (IndexTuple) PageGetItem(state->target, itemid); nhtid = psprintf("(%u,%u)", - ItemPointerGetBlockNumber(&(itup->t_tid)), - ItemPointerGetOffsetNumber(&(itup->t_tid))); + ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), + ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid))); ereport(ERROR, (errcode(ERRCODE_INDEX_CORRUPTED), @@ -932,7 +989,7 @@ bt_target_page_check(BtreeCheckState *state) */ if (!P_ISLEAF(topaque) && state->readonly) { - BlockNumber childblock = ItemPointerGetBlockNumber(&(itup->t_tid)); + BlockNumber childblock = ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)); bt_downlink_check(state, childblock, skey); } @@ -1336,8 +1393,8 @@ bt_tuple_present_callback(Relation index, HeapTuple htup, Datum *values, ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("heap tuple (%u,%u) from table \"%s\" lacks matching index tuple within index \"%s\"", - ItemPointerGetBlockNumber(&(itup->t_tid)), - ItemPointerGetOffsetNumber(&(itup->t_tid)), + ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), + ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid)), RelationGetRelationName(state->heaprel), RelationGetRelationName(state->rel)), !state->readonly @@ -1368,6 +1425,10 @@ offset_is_negative_infinity(BTPageOpaque opaque, OffsetNumber offset) * infinity item is either first or second line item, or there is none * within page. * + * "Negative infinity" tuple is a special corner case of pivot tuples, + * it has zero attributes while rest of pivot tuples have nkeyatts number + * of attributes. + * * Right-most pages don't have a high key, but could be said to * conceptually have a "positive infinity" high key. Thus, there is a * symmetry between down link items in parent pages, and high keys in diff --git a/src/backend/access/common/indextuple.c b/src/backend/access/common/indextuple.c index a58bd95620..ea6ad941ed 100644 --- a/src/backend/access/common/indextuple.c +++ b/src/backend/access/common/indextuple.c @@ -448,8 +448,8 @@ CopyIndexTuple(IndexTuple source) } /* - * Reform index tuple. Truncate nonkey (INCLUDE) attributes. - * Pass the number of attributes the truncated tuple must contain. + * Truncate tailing attributes from given index tuple leaving it with + * new_indnatts number of attributes. */ IndexTuple index_truncate_tuple(Relation idxrel, IndexTuple olditup, int new_indnatts) diff --git a/src/backend/access/nbtree/README b/src/backend/access/nbtree/README index 34f78b2f50..8719f28cf2 100644 --- a/src/backend/access/nbtree/README +++ b/src/backend/access/nbtree/README @@ -590,6 +590,10 @@ original search scankey is consulted as each index entry is sequentially scanned to decide whether to return the entry and whether the scan can stop (see _bt_checkkeys()). +We use term "pivot" index tuples to distinguish tuples which don't point +to heap tuples, but rather used for tree navigation. Pivot tuples includes +all tuples on non-leaf pages and high keys on leaf pages. + Notes About Data Representation ------------------------------- diff --git a/src/backend/access/nbtree/nbtinsert.c b/src/backend/access/nbtree/nbtinsert.c index 0fb751c259..692716d363 100644 --- a/src/backend/access/nbtree/nbtinsert.c +++ b/src/backend/access/nbtree/nbtinsert.c @@ -1202,7 +1202,7 @@ _bt_split(Relation rel, Buffer buf, Buffer cbuf, OffsetNumber firstright, */ if (indnatts != indnkeyatts && P_ISLEAF(lopaque)) { - lefthikey = index_truncate_tuple(rel, item, indnkeyatts); + lefthikey = _bt_truncate_tuple(rel, item); itemsz = IndexTupleSize(lefthikey); itemsz = MAXALIGN(itemsz); } @@ -1824,7 +1824,7 @@ _bt_insert_parent(Relation rel, /* form an index tuple that points at the new right page */ new_item = CopyIndexTuple(ritem); - ItemPointerSet(&(new_item->t_tid), rbknum, P_HIKEY); + ItemPointerSetBlockNumber(&(new_item->t_tid), rbknum); /* * Find the parent buffer and get the parent page. @@ -2093,7 +2093,8 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) left_item_sz = sizeof(IndexTupleData); left_item = (IndexTuple) palloc(left_item_sz); left_item->t_info = left_item_sz; - ItemPointerSet(&(left_item->t_tid), lbkno, P_HIKEY); + ItemPointerSetBlockNumber(&(left_item->t_tid), lbkno); + BTreeTupSetNAtts(left_item, 0); /* * Create downlink item for right page. The key for it is obtained from @@ -2103,7 +2104,7 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf) right_item_sz = ItemIdGetLength(itemid); item = (IndexTuple) PageGetItem(lpage, itemid); right_item = CopyIndexTuple(item); - ItemPointerSet(&(right_item->t_tid), rbkno, P_HIKEY); + ItemPointerSetBlockNumber(&(right_item->t_tid), rbkno); /* NO EREPORT(ERROR) from here till newroot op is logged */ START_CRIT_SECTION(); @@ -2234,6 +2235,7 @@ _bt_pgaddtup(Page page, { trunctuple = *itup; trunctuple.t_info = sizeof(IndexTupleData); + BTreeTupSetNAtts(&trunctuple, 0); itup = &trunctuple; itemsize = sizeof(IndexTupleData); } diff --git a/src/backend/access/nbtree/nbtpage.c b/src/backend/access/nbtree/nbtpage.c index 19085ba40a..22dc968e23 100644 --- a/src/backend/access/nbtree/nbtpage.c +++ b/src/backend/access/nbtree/nbtpage.c @@ -1110,7 +1110,7 @@ _bt_lock_branch_parent(Relation rel, BlockNumber child, BTStack stack, * Locate the downlink of "child" in the parent (updating the stack entry * if needed) */ - ItemPointerSet(&(stack->bts_btentry.t_tid), child, P_HIKEY); + ItemPointerSetBlockNumber(&(stack->bts_btentry.t_tid), child); pbuf = _bt_getstackbuf(rel, stack, BT_WRITE); if (pbuf == InvalidBuffer) elog(ERROR, "failed to re-find parent key in index \"%s\" for deletion target page %u", @@ -1519,15 +1519,15 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) #ifdef USE_ASSERT_CHECKING itemid = PageGetItemId(page, topoff); itup = (IndexTuple) PageGetItem(page, itemid); - Assert(ItemPointerGetBlockNumber(&(itup->t_tid)) == target); + Assert(ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)) == target); #endif nextoffset = OffsetNumberNext(topoff); itemid = PageGetItemId(page, nextoffset); itup = (IndexTuple) PageGetItem(page, itemid); - if (ItemPointerGetBlockNumber(&(itup->t_tid)) != rightsib) + if (ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)) != rightsib) elog(ERROR, "right sibling %u of block %u is not next child %u of block %u in index \"%s\"", - rightsib, target, ItemPointerGetBlockNumber(&(itup->t_tid)), + rightsib, target, ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)), BufferGetBlockNumber(topparent), RelationGetRelationName(rel)); /* @@ -1550,7 +1550,7 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) itemid = PageGetItemId(page, topoff); itup = (IndexTuple) PageGetItem(page, itemid); - ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); + ItemPointerSetBlockNumber(&(itup->t_tid), rightsib); nextoffset = OffsetNumberNext(topoff); PageIndexTupleDelete(page, nextoffset); @@ -1569,7 +1569,7 @@ _bt_mark_page_halfdead(Relation rel, Buffer leafbuf, BTStack stack) MemSet(&trunctuple, 0, sizeof(IndexTupleData)); trunctuple.t_info = sizeof(IndexTupleData); if (target != leafblkno) - ItemPointerSet(&trunctuple.t_tid, target, P_HIKEY); + ItemPointerSetBlockNumber(&trunctuple.t_tid, target); else ItemPointerSetInvalid(&trunctuple.t_tid); if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, @@ -1681,7 +1681,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty) */ if (ItemPointerIsValid(leafhikey)) { - target = ItemPointerGetBlockNumber(leafhikey); + target = ItemPointerGetBlockNumberNoCheck(leafhikey); Assert(target != leafblkno); /* fetch the block number of the topmost parent's left sibling */ @@ -1797,7 +1797,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty) /* remember the next non-leaf child down in the branch. */ itemid = PageGetItemId(page, P_FIRSTDATAKEY(opaque)); - nextchild = ItemPointerGetBlockNumber(&((IndexTuple) PageGetItem(page, itemid))->t_tid); + nextchild = ItemPointerGetBlockNumberNoCheck(&((IndexTuple) PageGetItem(page, itemid))->t_tid); if (nextchild == leafblkno) nextchild = InvalidBlockNumber; } @@ -1888,7 +1888,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, bool *rightsib_empty) if (nextchild == InvalidBlockNumber) ItemPointerSetInvalid(leafhikey); else - ItemPointerSet(leafhikey, nextchild, P_HIKEY); + ItemPointerSetBlockNumber(leafhikey, nextchild); } /* diff --git a/src/backend/access/nbtree/nbtsearch.c b/src/backend/access/nbtree/nbtsearch.c index 51dca64e13..bc5c10322a 100644 --- a/src/backend/access/nbtree/nbtsearch.c +++ b/src/backend/access/nbtree/nbtsearch.c @@ -147,7 +147,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey, offnum = _bt_binsrch(rel, *bufP, keysz, scankey, nextkey); itemid = PageGetItemId(page, offnum); itup = (IndexTuple) PageGetItem(page, itemid); - blkno = ItemPointerGetBlockNumber(&(itup->t_tid)); + blkno = ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)); par_blkno = BufferGetBlockNumber(*bufP); /* @@ -443,6 +443,17 @@ _bt_compare(Relation rel, if (!P_ISLEAF(opaque) && offnum == P_FIRSTDATAKEY(opaque)) return 1; + /* + * Check tuple has correct number of attributes. + */ + if (unlikely(!_bt_check_natts(rel, page, offnum))) + { + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("tuple has wrong number of attributes in index \"%s\"", + RelationGetRelationName(rel)))); + } + itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); /* @@ -1833,7 +1844,7 @@ _bt_get_endpoint(Relation rel, uint32 level, bool rightmost, offnum = P_FIRSTDATAKEY(opaque); itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum)); - blkno = ItemPointerGetBlockNumber(&(itup->t_tid)); + blkno = ItemPointerGetBlockNumberNoCheck(&(itup->t_tid)); buf = _bt_relandgetbuf(rel, buf, blkno, BT_READ); page = BufferGetPage(buf); @@ -1959,3 +1970,47 @@ _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir) so->numKilled = 0; /* just paranoia */ so->markItemIndex = -1; /* ditto */ } + +/* + * Check if index tuple have appropriate number of attributes. + */ +bool +_bt_check_natts(Relation index, Page page, OffsetNumber offnum) +{ + int16 natts = IndexRelationGetNumberOfAttributes(index); + int16 nkeyatts = IndexRelationGetNumberOfKeyAttributes(index); + ItemId itemid; + IndexTuple itup; + BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page); + + /* + * Assert that mask allocated for number of keys in index tuple can fit + * maximum number of index keys. + */ + StaticAssertStmt(BT_N_KEYS_OFFSET_MASK >= INDEX_MAX_KEYS, + "BT_N_KEYS_OFFSET_MASK can't fit INDEX_MAX_KEYS"); + + itemid = PageGetItemId(page, offnum); + itup = (IndexTuple) PageGetItem(page, itemid); + + if (P_ISLEAF(opaque) && offnum >= P_FIRSTDATAKEY(opaque)) + { + /* + * Regular leaf tuples have as every index attributes + */ + return (BTreeTupGetNAtts(itup, index) == natts); + } + else if (!P_ISLEAF(opaque) && offnum == P_FIRSTDATAKEY(opaque)) + { + /* Leftmost tuples on non-leaf pages have no attributes */ + return (BTreeTupGetNAtts(itup, index) == 0); + } + else + { + /* + * Pivot tuples stored in non-leaf pages and hikeys of leaf pages + * contain only key attributes + */ + return (BTreeTupGetNAtts(itup, index) == nkeyatts); + } +} diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 96045cb1ea..f34b2ed893 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -752,6 +752,7 @@ _bt_sortaddtup(Page page, { trunctuple = *itup; trunctuple.t_info = sizeof(IndexTupleData); + BTreeTupSetNAtts(&trunctuple, 0); itup = &trunctuple; itemsize = sizeof(IndexTupleData); } @@ -907,7 +908,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) * it will be that in the future. Now the purpose is just to save * more space on inner pages of btree. */ - keytup = index_truncate_tuple(wstate->index, oitup, indnkeyatts); + keytup = _bt_truncate_tuple(wstate->index, oitup); /* delete "wrong" high key, insert keytup as P_HIKEY. */ PageIndexTupleDelete(opage, P_HIKEY); @@ -924,7 +925,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) state->btps_next = _bt_pagestate(wstate, state->btps_level + 1); Assert(state->btps_minkey != NULL); - ItemPointerSet(&(state->btps_minkey->t_tid), oblkno, P_HIKEY); + ItemPointerSetBlockNumber(&(state->btps_minkey->t_tid), oblkno); _bt_buildadd(wstate, state->btps_next, state->btps_minkey); pfree(state->btps_minkey); @@ -978,8 +979,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup) * into the parent page as a downlink */ if (indnkeyatts != indnatts && P_ISLEAF(pageop)) - state->btps_minkey = index_truncate_tuple(wstate->index, - itup, indnkeyatts); + state->btps_minkey = _bt_truncate_tuple(wstate->index, itup); else state->btps_minkey = CopyIndexTuple(itup); } @@ -1034,7 +1034,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state) else { Assert(s->btps_minkey != NULL); - ItemPointerSet(&(s->btps_minkey->t_tid), blkno, P_HIKEY); + ItemPointerSetBlockNumber(&(s->btps_minkey->t_tid), blkno); _bt_buildadd(wstate, s->btps_next, s->btps_minkey); pfree(s->btps_minkey); s->btps_minkey = NULL; diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 2fc5924bf0..149b52e3ad 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -2078,3 +2078,23 @@ btproperty(Oid index_oid, int attno, return false; /* punt to generic code */ } } + +/* + * _bt_truncate_tuple() -- remove non-key (INCLUDE) attributes from index + * tuple. + * + * Transforms an ordinal B-tree leaf index tuple into pivot tuple to be used + * as hikey or non-leaf page tuple with downlink. Note that t_tid offset + * will be overritten in order to represent number of present tuple attributes. + */ +IndexTuple +_bt_truncate_tuple(Relation idxrel, IndexTuple olditup) +{ + IndexTuple newitup; + int nkeyattrs = IndexRelationGetNumberOfKeyAttributes(idxrel); + + newitup = index_truncate_tuple(idxrel, olditup, nkeyattrs); + BTreeTupSetNAtts(newitup, nkeyattrs); + + return newitup; +} diff --git a/src/backend/access/nbtree/nbtxlog.c b/src/backend/access/nbtree/nbtxlog.c index 9e72425786..ba575c1245 100644 --- a/src/backend/access/nbtree/nbtxlog.c +++ b/src/backend/access/nbtree/nbtxlog.c @@ -614,7 +614,7 @@ btree_xlog_delete_get_latestRemovedXid(XLogReaderState *record) * heap_fetch, since it uses ReadBuffer rather than XLogReadBuffer. * Note that we are not looking at tuple data here, just headers. */ - hoffnum = ItemPointerGetOffsetNumber(&(itup->t_tid)); + hoffnum = ItemPointerGetOffsetNumberNoCheck(&(itup->t_tid)); hitemid = PageGetItemId(hpage, hoffnum); /* @@ -762,11 +762,11 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) nextoffset = OffsetNumberNext(poffset); itemid = PageGetItemId(page, nextoffset); itup = (IndexTuple) PageGetItem(page, itemid); - rightsib = ItemPointerGetBlockNumber(&itup->t_tid); + rightsib = ItemPointerGetBlockNumberNoCheck(&itup->t_tid); itemid = PageGetItemId(page, poffset); itup = (IndexTuple) PageGetItem(page, itemid); - ItemPointerSet(&(itup->t_tid), rightsib, P_HIKEY); + ItemPointerSetBlockNumber(&(itup->t_tid), rightsib); nextoffset = OffsetNumberNext(poffset); PageIndexTupleDelete(page, nextoffset); @@ -796,7 +796,7 @@ btree_xlog_mark_page_halfdead(uint8 info, XLogReaderState *record) MemSet(&trunctuple, 0, sizeof(IndexTupleData)); trunctuple.t_info = sizeof(IndexTupleData); if (xlrec->topparent != InvalidBlockNumber) - ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY); + ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent); else ItemPointerSetInvalid(&trunctuple.t_tid); if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, @@ -906,7 +906,7 @@ btree_xlog_unlink_page(uint8 info, XLogReaderState *record) MemSet(&trunctuple, 0, sizeof(IndexTupleData)); trunctuple.t_info = sizeof(IndexTupleData); if (xlrec->topparent != InvalidBlockNumber) - ItemPointerSet(&trunctuple.t_tid, xlrec->topparent, P_HIKEY); + ItemPointerSetBlockNumber(&trunctuple.t_tid, xlrec->topparent); else ItemPointerSetInvalid(&trunctuple.t_tid); if (PageAddItem(page, (Item) &trunctuple, sizeof(IndexTupleData), P_HIKEY, diff --git a/src/include/access/nbtree.h b/src/include/access/nbtree.h index f532f3ffff..646edb521e 100644 --- a/src/include/access/nbtree.h +++ b/src/include/access/nbtree.h @@ -157,11 +157,8 @@ typedef struct BTMetaPageData * as unique identifier for a given index tuple (logical position * within a level). - vadim 04/09/97 */ -#define BTTidSame(i1, i2) \ - ((ItemPointerGetBlockNumber(&(i1)) == ItemPointerGetBlockNumber(&(i2))) && \ - (ItemPointerGetOffsetNumber(&(i1)) == ItemPointerGetOffsetNumber(&(i2)))) #define BTEntrySame(i1, i2) \ - BTTidSame((i1)->t_tid, (i2)->t_tid) + ((ItemPointerGetBlockNumberNoCheck(&(i1)->t_tid) == ItemPointerGetBlockNumberNoCheck(&(i2)->t_tid))) /* @@ -212,6 +209,56 @@ typedef struct BTMetaPageData #define P_FIRSTDATAKEY(opaque) (P_RIGHTMOST(opaque) ? P_HIKEY : P_FIRSTKEY) +/* + * B-tree index with INCLUDE clause has non-key (included) attributes, which + * are used solely in index-only scans. Those non-key attributes are present + * in leaf index tuples which point to corresponding heap tuples. However, + * tree also contains "pivot" tuples. Pivot tuples are used for navigation + * during tree traversal. Pivot tuples include tuples on non-leaf pages and + * high key tuples. Such, tuples don't need to included attributes, because + * they have no use during tree traversal. This is why we truncate them in + * order to save some space. Therefore, B-tree index with INCLUDE clause + * contain tuples with variable number of attributes. + * + * In order to keep on-disk compatibility with upcoming suffix truncation of + * pivot tuples, we store number of attributes present inside tuple itself. + * Thankfully, offset number is always unused in pivot tuple. So, we use free + * bit of index tuple flags as sign that offset have alternative meaning: it + * stores number of keys present in index tuple (12 bit is far enough for that). + * And we have 4 bits reserved for future usage. + * + * It's possible that index tuple has zero attributes (leftmost item of + * iternal page). And we have assertion that offset number is greater or equal + * to 1. This is why we store (number_of_attributes + 1) in offset number. + */ +#define INDEX_ALT_TID_MASK 0x2000 /* flag indicating t_tid offset has + an alternative meaning */ +#define BT_RESERVED_OFFSET_MASK 0xF000 /* mask of bits in t_tid offset + reserved for future usage */ +#define BT_N_KEYS_OFFSET_MASK 0x0FFF /* mask of bits in t_tid offset + holding number of attributes + actually present in index tuple */ + +/* Set number of attributes to B-tree index tuple overriding t_tid offset */ +#define BTreeTupSetNAtts(itup, n) \ + do { \ + (itup)->t_info |= INDEX_ALT_TID_MASK; \ + ItemPointerSetOffsetNumber(&(itup)->t_tid, n); \ + } while(0) + +/* Get number of attributes in B-tree index tuple */ +#define BTreeTupGetNAtts(itup, index) \ + ( \ + (itup)->t_info & INDEX_ALT_TID_MASK ? \ + ( \ + AssertMacro((ItemPointerGetOffsetNumberNoCheck(&(itup)->t_tid) & BT_RESERVED_OFFSET_MASK) == 0), \ + ItemPointerGetOffsetNumberNoCheck(&(itup)->t_tid) & BT_N_KEYS_OFFSET_MASK \ + ) \ + : \ + IndexRelationGetNumberOfAttributes(index) \ + ) + + /* * Operator strategy numbers for B-tree have been moved to access/stratnum.h, * because many places need to use them in ScanKeyInit() calls. @@ -524,6 +571,7 @@ extern bool _bt_first(IndexScanDesc scan, ScanDirection dir); extern bool _bt_next(IndexScanDesc scan, ScanDirection dir); extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost, Snapshot snapshot); +extern bool _bt_check_natts(Relation index, Page page, OffsetNumber offnum); /* * prototypes for functions in nbtutils.c @@ -552,6 +600,7 @@ extern bytea *btoptions(Datum reloptions, bool validate); extern bool btproperty(Oid index_oid, int attno, IndexAMProperty prop, const char *propname, bool *res, bool *isnull); +extern IndexTuple _bt_truncate_tuple(Relation idxrel, IndexTuple olditup); /* * prototypes for functions in nbtvalidate.c