*** a/src/backend/access/common/heaptuple.c --- b/src/backend/access/common/heaptuple.c *************** *** 60,65 **** --- 60,66 ---- #include "access/sysattr.h" #include "access/tuptoaster.h" #include "executor/tuptable.h" + #include "utils/datum.h" /* Does att's datatype allow packing into the 1-byte-header varlena format? */ *************** *** 69,74 **** --- 70,80 ---- #define VARLENA_ATT_IS_PACKABLE(att) \ ((att)->attstorage != 'p') + /* WAL Diff update options */ + #define HEAP_UPDATE_WAL_OPT_COPY 0 + #define HEAP_UPDATE_WAL_OPT_ADD 1 + #define HEAP_UPDATE_WAL_OPT_IGN 2 + #define HEAP_UPDATE_WAL_OPT_PAD 3 /* ---------------------------------------------------------------- * misc support routines *************** *** 618,623 **** heap_copytuple_with_tuple(HeapTuple src, HeapTuple dest) --- 624,1140 ---- } /* + * Check if the specified attribute's value is same in both given tuples. + * Subroutine for HeapSatisfiesHOTUpdate. + */ + bool + heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, + HeapTuple tup1, HeapTuple tup2) + { + Datum value1, + value2; + bool isnull1, + isnull2; + Form_pg_attribute att; + + /* + * If it's a whole-tuple reference, say "not equal". It's not really + * worth supporting this case, since it could only succeed after a no-op + * update, which is hardly a case worth optimizing for. + */ + if (attrnum == 0) + return false; + + /* + * Likewise, automatically say "not equal" for any system attribute other + * than OID and tableOID; we cannot expect these to be consistent in a HOT + * chain, or even to be set correctly yet in the new tuple. + */ + if (attrnum < 0) + { + if (attrnum != ObjectIdAttributeNumber && + attrnum != TableOidAttributeNumber) + return false; + } + + /* + * Extract the corresponding values. XXX this is pretty inefficient if + * there are many indexed columns. Should HeapSatisfiesHOTUpdate do a + * single heap_deform_tuple call on each tuple, instead? But that doesn't + * work for system columns ... + */ + value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1); + value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2); + + /* + * If one value is NULL and other is not, then they are certainly not + * equal + */ + if (isnull1 != isnull2) + return false; + + /* + * If both are NULL, they can be considered equal. + */ + if (isnull1) + return true; + + /* + * We do simple binary comparison of the two datums. This may be overly + * strict because there can be multiple binary representations for the + * same logical value. But we should be OK as long as there are no false + * positives. Using a type-specific equality operator is messy because + * there could be multiple notions of equality in different operator + * classes; furthermore, we cannot safely invoke user-defined functions + * while holding exclusive buffer lock. + */ + if (attrnum <= 0) + { + /* The only allowed system columns are OIDs, so do this */ + return (DatumGetObjectId(value1) == DatumGetObjectId(value2)); + } + else + { + Assert(attrnum <= tupdesc->natts); + att = tupdesc->attrs[attrnum - 1]; + return datumIsEqual(value1, value2, att->attbyval, att->attlen); + } + } + + + /* + * get_tuple_info - Gets the tuple offset and value. + * + * calculates the attribute value and offset, where the attribute ends in the + * tuple based on the attribute number and previous fetched attribute info. + * + * offset (I/P and O/P variable) - Input as end of previous attribute offset + * and incase if it is a first attribute then it's value is zero. + * Output as end of the current attribute in the tuple. + * usecacheoff (I/P and O/P variable) - Attribute cacheoff can be used or not. + */ + static void + get_tuple_info(Form_pg_attribute *att, HeapTuple tuple, bits8 *bp, + bool hasnulls, int attnum, Datum *value, uint16 *offset, + bool *usecacheoff) + { + Form_pg_attribute thisatt = att[attnum]; + uint16 off = *offset; + bool slow = *usecacheoff; + char *tp; + HeapTupleHeader tup = tuple->t_data; + + tp = (char *) tup + tup->t_hoff; + + if (hasnulls && att_isnull(attnum, bp)) + { + slow = true; /* can't use attcacheoff anymore */ + *offset = off; + *usecacheoff = slow; + return; + } + + if (!slow && thisatt->attcacheoff >= 0) + off = thisatt->attcacheoff; + else if (thisatt->attlen == -1) + { + /* + * We can only cache the offset for a varlena attribute if the offset + * is already suitably aligned, so that there would be no pad bytes in + * any case: then the offset will be valid for either an aligned or + * unaligned value. + */ + if (!slow && + off == att_align_nominal(off, thisatt->attalign)) + thisatt->attcacheoff = off; + else + { + off = att_align_pointer(off, thisatt->attalign, -1, + tp + off); + slow = true; + } + } + else + { + /* not varlena, so safe to use att_align_nominal */ + off = att_align_nominal(off, thisatt->attalign); + + if (!slow) + thisatt->attcacheoff = off; + } + + *value = fetchatt(thisatt, tp + off); + + off = att_addlength_pointer(off, thisatt->attlen, tp + off); + + if (thisatt->attlen <= 0) + slow = true; /* can't use attcacheoff anymore */ + + *offset = off; + *usecacheoff = slow; + } + + + /* + * heap_delta_encode + * Forms a diff tuple from old and new tuple with the modified columns. + * + * att - attribute list. + * oldtup - pointer to the old tuple. + * heaptup - pointer to the modified tuple. + * wal_tup - pointer to the wal record which needs to be formed from old + * and new tuples by using the modified columns list. + */ + bool + heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup, + HeapTuple heaptup, HeapTuple wal_tup) + { + Form_pg_attribute *att = tupleDesc->attrs; + int numberOfAttributes; + uint16 cur_offset = 0, + prev_offset = 0, + offset = 0; + int attnum; + HeapTupleHeader newtuphdr = heaptup->t_data; + bits8 *new_bp = newtuphdr->t_bits, + *old_bp = oldtup->t_data->t_bits; + bool old_hasnulls = HeapTupleHasNulls(oldtup); + bool new_hasnulls = HeapTupleHasNulls(heaptup); + bool cur_usecacheoff = false, + prev_usecacheoff = false; + Datum cur_value, + prev_value; + uint16 data_length; + bool check_for_padding = false; + char *data; + uint16 wal_offset = 0; + + numberOfAttributes = HeapTupleHeaderGetNatts(newtuphdr); + + data = (char *) wal_tup->t_data; + wal_offset = newtuphdr->t_hoff; + + /* Copy the tuple header to the WAL tuple */ + memcpy(data, heaptup->t_data, wal_offset); + + for (attnum = 0; attnum < numberOfAttributes; attnum++) + { + /* + * If the attribute is modified by the update operation, store the + * appropiate offsets in the WAL record, otherwise skip to the next + * attribute. + */ + if (!heap_tuple_attr_equals(tupleDesc, (attnum + 1), oldtup, heaptup)) + { + check_for_padding = true; + + /* + * calculate the offset where the modified attribute starts in the + * old tuple used to store in the WAL record, this will be used to + * traverse the old tuple during recovery. + */ + if (prev_offset) + { + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_COPY; + wal_offset += sizeof(uint8); + + wal_offset = SHORTALIGN(wal_offset); + + *(uint16 *) (data + wal_offset) = prev_offset; + wal_offset += sizeof(uint16); + } + + /* calculate the old tuple field length which needs to ignored */ + offset = prev_offset; + get_tuple_info(att, oldtup, old_bp, old_hasnulls, attnum, + &prev_value, &prev_offset, &prev_usecacheoff); + + data_length = prev_offset - offset; + + if (data_length) + { + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_IGN; + wal_offset += sizeof(uint8); + + wal_offset = SHORTALIGN(wal_offset); + + *(uint16 *) (data + wal_offset) = data_length; + wal_offset += sizeof(uint16); + } + + /* + * calculate the new tuple field start position to check whether + * any padding is required or not. + */ + offset = cur_offset; + cur_offset = att_align_pointer(cur_offset, + att[attnum]->attalign, att[attnum]->attlen, + (char *) newtuphdr + newtuphdr->t_hoff + cur_offset); + + data_length = cur_offset - offset; + + /* + * The above calculation is required to identify, that any + * alignment is required or not. And the padding command is added + * only incase of that the data is not NULL. which is done at + * below. + */ + + offset = cur_offset; + get_tuple_info(att, heaptup, new_bp, new_hasnulls, attnum, + &cur_value, &cur_offset, &cur_usecacheoff); + + /* if the new tuple data is null then nothing is required to add */ + if (new_hasnulls && att_isnull(attnum, new_bp)) + { + continue; + } + + /* Add the padding if requires as the data is not NULL */ + if (data_length) + { + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_PAD; + wal_offset += sizeof(uint8); + + *(uint8 *) (data + wal_offset) = data_length; + wal_offset += sizeof(uint8); + } + + /* get the attribute value and end offset for same */ + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_ADD; + wal_offset += sizeof(uint8); + + wal_offset = SHORTALIGN(wal_offset); + + data_length = cur_offset - offset; + *(uint16 *) (data + wal_offset) = data_length; + wal_offset += sizeof(uint16); + + if (att[attnum]->attbyval) + { + /* pass-by-value */ + char tempdata[sizeof(Datum)]; + + /* + * Here we are not storing the data as aligned in the WAL + * record as we don't have the tuple descriptor while + * replaying the xlog. + * + * But this alignment is of the data is taken care while + * framing the tuple during heap_xlog_update. + */ + store_att_byval(tempdata, + cur_value, + att[attnum]->attlen); + memcpy((data + wal_offset), tempdata, att[attnum]->attlen); + } + else + { + memcpy((data + wal_offset), + DatumGetPointer(cur_value), + data_length); + } + + wal_offset += data_length; + } + else + { + /* + * calculate the old tuple field start position, required to + * ignore if any alignmet is present. + */ + offset = prev_offset; + prev_offset = att_align_pointer(prev_offset, + att[attnum]->attalign, att[attnum]->attlen, + (char *) oldtup->t_data + oldtup->t_data->t_hoff + prev_offset); + + data_length = prev_offset - offset; + + /* + * calculate the new tuple field start position to check whether + * any padding is required or not because field alignment. + */ + offset = cur_offset; + cur_offset = att_align_pointer(cur_offset, + att[attnum]->attalign, att[attnum]->attlen, + (char *) newtuphdr + newtuphdr->t_hoff + cur_offset); + + if (data_length != (cur_offset - offset)) + { + if (!check_for_padding) + { + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_COPY; + wal_offset += sizeof(uint8); + + wal_offset = SHORTALIGN(wal_offset); + + *(uint16 *) (data + wal_offset) = (prev_offset - data_length); + wal_offset += sizeof(uint16); + } + + if (data_length) + { + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_IGN; + wal_offset += sizeof(uint8); + + wal_offset = SHORTALIGN(wal_offset); + + *(uint16 *) (data + wal_offset) = data_length; + wal_offset += sizeof(uint16); + } + + data_length = cur_offset - offset; + + if (data_length) + { + *(uint8 *) (data + wal_offset) = HEAP_UPDATE_WAL_OPT_PAD; + wal_offset += sizeof(uint8); + + *(uint8 *) (data + wal_offset) = data_length; + wal_offset += sizeof(uint8); + } + } + + get_tuple_info(att, oldtup, old_bp, old_hasnulls, attnum, + &prev_value, &prev_offset, &prev_usecacheoff); + + get_tuple_info(att, heaptup, new_bp, new_hasnulls, attnum, + &cur_value, &cur_offset, &cur_usecacheoff); + + check_for_padding = false; + } + } + + if (wal_offset < heaptup->t_len) + { + wal_tup->t_len = wal_offset; + wal_tup->t_self = heaptup->t_self; + wal_tup->t_tableOid = heaptup->t_tableOid; + + return true; + } + + memcpy(wal_tup, heaptup, sizeof(HeapTuple)); + return false; + } + + /* + * heap_delta_decode + * deforms a diff tuple and forms the new tuple with the help of old tuple. + * + * The WAL record data is in the format as below + * + * COPY + offset until copy required + * IGN + length needs to be ignored from the old tuple. + * PAD + length needs to padded with zero in new tuple. + * ADD + length of data + data which is modified. + * + * For the COPY command, copy the specified length from old tuple. + * + * Once the old tuple data copied, then increase the offset by the + * copied length. + * + * For the IGN command, ignore the specified length in the old tuple. + * + * For the PAD command, fill with zeros of the specified length in + * the new tuple. + * + * For the ADD command, copy the corresponding length of data from WAL + * record to the new tuple. + * + * Repeat this procedure until the WAL record reaches the end. + * + * If any remaining left out old tuple data will be copied at last. + * + * htup - old tuple data pointer from which new tuple needs to be formed. + * old_tup_len - old tuple length. + * data - pointer to the new tuple which needs to be framed. + * new_tup_len - output of new tuple data length. + * waldata - wal record pointer from which the new tuple needs to formed. + * wal_len - wal record length. + */ + void + heap_delta_decode(HeapTupleHeader htup, uint32 old_tup_len, char *data, + uint32 *new_tup_len, char *waldata, uint32 wal_len) + { + uint8 command; + uint16 len = 0, + data_length, + prev_offset = 0, + cur_offset = 0; + char *olddata = (char *) htup + htup->t_hoff; + + old_tup_len -= htup->t_hoff; + + /* + * Frame the new tuple from old tuple and WAL record + */ + len = 0; + + /* Frame the new tuple from the old and WAL tuples */ + while (len < wal_len) + { + command = *(uint8 *) (waldata + len); + len += sizeof(uint8); + + switch (command) + { + case HEAP_UPDATE_WAL_OPT_COPY: + len = SHORTALIGN(len); + data_length = *(uint16 *) (waldata + len) - prev_offset; + + /* Copy the old tuple data */ + memcpy((data + cur_offset), + (olddata + prev_offset), + data_length); + cur_offset += data_length; + prev_offset += data_length; + + len += sizeof(uint16); + break; + case HEAP_UPDATE_WAL_OPT_ADD: + len = SHORTALIGN(len); + data_length = *(uint16 *) (waldata + len); + len += sizeof(uint16); + + /* Copy the modified attribute data from WAL record */ + memcpy((data + cur_offset), (waldata + len), data_length); + cur_offset += data_length; + len += data_length; + break; + case HEAP_UPDATE_WAL_OPT_IGN: + len = SHORTALIGN(len); + data_length = *(uint16 *) (waldata + len); + + /* Skip the oldtuple with data length in the WAL record */ + prev_offset += data_length; + len += sizeof(uint16); + break; + case HEAP_UPDATE_WAL_OPT_PAD: + data_length = *(uint8 *) (waldata + len); + cur_offset += data_length; + len += sizeof(uint8); + break; + default: + Assert(0); + break; + } + } + + /* Copy the remaining old tuple data to the new tuple */ + if (prev_offset < old_tup_len) + { + memcpy((data + cur_offset), + (olddata + prev_offset), + (old_tup_len - prev_offset)); + cur_offset += (old_tup_len - prev_offset); + } + + *new_tup_len = cur_offset; + } + + + /* * heap_form_tuple * construct a tuple from the given values[] and isnull[] arrays, * which are of the length indicated by tupleDescriptor->natts *** a/src/backend/access/heap/heapam.c --- b/src/backend/access/heap/heapam.c *************** *** 71,77 **** #include "utils/syscache.h" #include "utils/tqual.h" - /* GUC variable */ bool synchronize_seqscans = true; --- 71,76 ---- *************** *** 85,91 **** static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, ! bool all_visible_cleared, bool new_all_visible_cleared); static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs, HeapTuple oldtup, HeapTuple newtup); --- 84,91 ---- TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, ! bool all_visible_cleared, bool new_all_visible_cleared, ! bool diff_update); static bool HeapSatisfiesHOTUpdate(Relation relation, Bitmapset *hot_attrs, HeapTuple oldtup, HeapTuple newtup); *************** *** 2737,2742 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, --- 2737,2743 ---- ItemId lp; HeapTupleData oldtup; HeapTuple heaptup; + HeapTupleData wal_tup; Page page; BlockNumber block; Buffer buffer, *************** *** 2752,2757 **** heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, --- 2753,2763 ---- bool use_hot_update = false; bool all_visible_cleared = false; bool all_visible_cleared_new = false; + struct + { + HeapTupleHeaderData hdr; + char data[MaxHeapTupleSize]; + } tbuf; Assert(ItemPointerIsValid(otid)); *************** *** 3195,3204 **** l2: /* XLOG stuff */ if (RelationNeedsWAL(relation)) { ! XLogRecPtr recptr = log_heap_update(relation, buffer, oldtup.t_self, ! newbuf, heaptup, ! all_visible_cleared, ! all_visible_cleared_new); if (newbuf != buffer) { --- 3201,3232 ---- /* XLOG stuff */ if (RelationNeedsWAL(relation)) { ! XLogRecPtr recptr; ! bool is_delta_update = false; ! ! /* ! * Apply the xlog diff update algorithm only for hot updates. ! */ ! if (!need_toast && (newbuf == buffer)) ! { ! wal_tup.t_data = (HeapTupleHeader) &tbuf; ! is_delta_update = heap_delta_encode(relation->rd_att, &oldtup, ! heaptup, &wal_tup); ! ! recptr = log_heap_update(relation, buffer, oldtup.t_self, ! newbuf, &wal_tup, ! all_visible_cleared, ! all_visible_cleared_new, ! is_delta_update); ! } ! else ! { ! recptr = log_heap_update(relation, buffer, oldtup.t_self, ! newbuf, heaptup, ! all_visible_cleared, ! all_visible_cleared_new, ! is_delta_update); ! } if (newbuf != buffer) { *************** *** 3258,3341 **** l2: } /* - * Check if the specified attribute's value is same in both given tuples. - * Subroutine for HeapSatisfiesHOTUpdate. - */ - static bool - heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, - HeapTuple tup1, HeapTuple tup2) - { - Datum value1, - value2; - bool isnull1, - isnull2; - Form_pg_attribute att; - - /* - * If it's a whole-tuple reference, say "not equal". It's not really - * worth supporting this case, since it could only succeed after a no-op - * update, which is hardly a case worth optimizing for. - */ - if (attrnum == 0) - return false; - - /* - * Likewise, automatically say "not equal" for any system attribute other - * than OID and tableOID; we cannot expect these to be consistent in a HOT - * chain, or even to be set correctly yet in the new tuple. - */ - if (attrnum < 0) - { - if (attrnum != ObjectIdAttributeNumber && - attrnum != TableOidAttributeNumber) - return false; - } - - /* - * Extract the corresponding values. XXX this is pretty inefficient if - * there are many indexed columns. Should HeapSatisfiesHOTUpdate do a - * single heap_deform_tuple call on each tuple, instead? But that doesn't - * work for system columns ... - */ - value1 = heap_getattr(tup1, attrnum, tupdesc, &isnull1); - value2 = heap_getattr(tup2, attrnum, tupdesc, &isnull2); - - /* - * If one value is NULL and other is not, then they are certainly not - * equal - */ - if (isnull1 != isnull2) - return false; - - /* - * If both are NULL, they can be considered equal. - */ - if (isnull1) - return true; - - /* - * We do simple binary comparison of the two datums. This may be overly - * strict because there can be multiple binary representations for the - * same logical value. But we should be OK as long as there are no false - * positives. Using a type-specific equality operator is messy because - * there could be multiple notions of equality in different operator - * classes; furthermore, we cannot safely invoke user-defined functions - * while holding exclusive buffer lock. - */ - if (attrnum <= 0) - { - /* The only allowed system columns are OIDs, so do this */ - return (DatumGetObjectId(value1) == DatumGetObjectId(value2)); - } - else - { - Assert(attrnum <= tupdesc->natts); - att = tupdesc->attrs[attrnum - 1]; - return datumIsEqual(value1, value2, att->attbyval, att->attlen); - } - } - - /* * Check if the old and new tuples represent a HOT-safe update. To be able * to do a HOT update, we must not have changed any columns used in index * definitions. --- 3286,3291 ---- *************** *** 4429,4435 **** log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer, static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, ! bool all_visible_cleared, bool new_all_visible_cleared) { xl_heap_update xlrec; xl_heap_header xlhdr; --- 4379,4386 ---- static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, Buffer newbuf, HeapTuple newtup, ! bool all_visible_cleared, bool new_all_visible_cleared, ! bool delta_update) { xl_heap_update xlrec; xl_heap_header xlhdr; *************** *** 4446,4456 **** log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, else info = XLOG_HEAP_UPDATE; xlrec.target.node = reln->rd_node; xlrec.target.tid = from; ! xlrec.all_visible_cleared = all_visible_cleared; xlrec.newtid = newtup->t_self; ! xlrec.new_all_visible_cleared = new_all_visible_cleared; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; --- 4397,4412 ---- else info = XLOG_HEAP_UPDATE; + xlrec.flags = 0; xlrec.target.node = reln->rd_node; xlrec.target.tid = from; ! if (all_visible_cleared) ! xlrec.flags |= XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED; xlrec.newtid = newtup->t_self; ! if (new_all_visible_cleared) ! xlrec.flags |= XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED; ! if (delta_update) ! xlrec.flags |= XL_HEAP_UPDATE_DELTA_ENCODED; rdata[0].data = (char *) &xlrec; rdata[0].len = SizeOfHeapUpdate; *************** *** 5232,5237 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) --- 5188,5195 ---- OffsetNumber offnum; ItemId lp = NULL; HeapTupleHeader htup; + HeapTupleHeader oldtup = NULL; + uint32 old_tup_len = 0; struct { HeapTupleHeaderData hdr; *************** *** 5246,5252 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ ! if (xlrec->all_visible_cleared) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); --- 5204,5210 ---- * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ ! if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); *************** *** 5289,5295 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "heap_update_redo: invalid lp"); ! htup = (HeapTupleHeader) PageGetItem(page, lp); htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | --- 5247,5254 ---- if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsNormal(lp)) elog(PANIC, "heap_update_redo: invalid lp"); ! oldtup = htup = (HeapTupleHeader) PageGetItem(page, lp); ! old_tup_len = ItemIdGetLength(lp); htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | *************** *** 5308,5314 **** heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); ! if (xlrec->all_visible_cleared) PageClearAllVisible(page); /* --- 5267,5273 ---- /* Mark the page as a candidate for pruning */ PageSetPrunable(page, record->xl_xid); ! if (xlrec->flags & XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); /* *************** *** 5330,5336 **** newt:; * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ ! if (xlrec->new_all_visible_cleared) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); --- 5289,5295 ---- * The visibility map may need to be fixed even if the heap page is * already up-to-date. */ ! if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); *************** *** 5380,5395 **** newsame:; hsize = SizeOfHeapUpdate + SizeOfHeapHeader; newlen = record->xl_len - hsize; ! Assert(newlen <= MaxHeapTupleSize); memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, SizeOfHeapHeader); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); ! /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ ! memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), ! (char *) xlrec + hsize, ! newlen); newlen += offsetof(HeapTupleHeaderData, t_bits); htup->t_infomask2 = xlhdr.t_infomask2; htup->t_infomask = xlhdr.t_infomask; --- 5339,5386 ---- hsize = SizeOfHeapUpdate + SizeOfHeapHeader; newlen = record->xl_len - hsize; ! memcpy((char *) &xlhdr, (char *) xlrec + SizeOfHeapUpdate, SizeOfHeapHeader); htup = &tbuf.hdr; MemSet((char *) htup, 0, sizeof(HeapTupleHeaderData)); ! ! /* ! * If the new tuple was delta-encoded, decode it. ! */ ! if (xlrec->flags & XL_HEAP_UPDATE_DELTA_ENCODED) ! { ! int bitmap_len; ! char *data = (char *) &tbuf.hdr; ! uint32 wal_len; ! char *waldata; ! ! Assert(oldtup != NULL); ! ! bitmap_len = (xlhdr.t_hoff - offsetof(HeapTupleHeaderData, t_bits)); ! wal_len = record->xl_len - hsize - bitmap_len; ! waldata = (char *) xlrec + hsize + bitmap_len; ! ! /* copy exactly the tuple header present in the WAL to new tuple */ ! memcpy(data + offsetof(HeapTupleHeaderData, t_bits), ! (char *) xlrec + hsize, ! bitmap_len); ! ! data += xlhdr.t_hoff; ! ! heap_delta_decode(oldtup, old_tup_len, data, &newlen, waldata, wal_len); ! newlen += bitmap_len; ! } ! else ! { ! Assert(newlen <= MaxHeapTupleSize); ! /* PG73FORMAT: get bitmap [+ padding] [+ oid] + data */ ! memcpy((char *) htup + offsetof(HeapTupleHeaderData, t_bits), ! (char *) xlrec + hsize, ! newlen); ! } ! newlen += offsetof(HeapTupleHeaderData, t_bits); htup->t_infomask2 = xlhdr.t_infomask2; htup->t_infomask = xlhdr.t_infomask; *************** *** 5404,5410 **** newsame:; if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); ! if (xlrec->new_all_visible_cleared) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ --- 5395,5401 ---- if (offnum == InvalidOffsetNumber) elog(PANIC, "heap_update_redo: failed to add tuple"); ! if (xlrec->flags & XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED) PageClearAllVisible(page); freespace = PageGetHeapFreeSpace(page); /* needed to update FSM below */ *** a/src/include/access/heapam_xlog.h --- b/src/include/access/heapam_xlog.h *************** *** 142,153 **** typedef struct xl_heap_update { xl_heaptid target; /* deleted tuple id */ ItemPointerData newtid; /* new inserted tuple id */ ! bool all_visible_cleared; /* PD_ALL_VISIBLE was cleared */ ! bool new_all_visible_cleared; /* same for the page of newtid */ /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; ! #define SizeOfHeapUpdate (offsetof(xl_heap_update, new_all_visible_cleared) + sizeof(bool)) /* * This is what we need to know about vacuum page cleanup/redirect --- 142,157 ---- { xl_heaptid target; /* deleted tuple id */ ItemPointerData newtid; /* new inserted tuple id */ ! char flags; ! /* NEW TUPLE xl_heap_header AND TUPLE DATA FOLLOWS AT END OF STRUCT */ } xl_heap_update; ! #define XL_HEAP_UPDATE_ALL_VISIBLE_CLEARED 0x01 ! #define XL_HEAP_UPDATE_NEW_ALL_VISIBLE_CLEARED 0x02 ! #define XL_HEAP_UPDATE_DELTA_ENCODED 0x04 ! ! #define SizeOfHeapUpdate (offsetof(xl_heap_update, flags) + sizeof(char)) /* * This is what we need to know about vacuum page cleanup/redirect *** a/src/include/access/htup_details.h --- b/src/include/access/htup_details.h *************** *** 527,532 **** struct MinimalTupleData --- 527,538 ---- #define HeapTupleSetOid(tuple, oid) \ HeapTupleHeaderSetOid((tuple)->t_data, (oid)) + /* + * Minimum tuple length required by the tuple during update operation for doing + * WAL optimization of update operation. + */ + #define MinHeapTupleSizeForDeltaUpdate 64 + /* ---------------- * fastgetattr *************** *** 636,641 **** extern HeapTuple heap_modify_tuple(HeapTuple tuple, --- 642,654 ---- extern void heap_deform_tuple(HeapTuple tuple, TupleDesc tupleDesc, Datum *values, bool *isnull); + extern bool heap_tuple_attr_equals(TupleDesc tupdesc, int attrnum, + HeapTuple tup1, HeapTuple tup2); + extern bool heap_delta_encode(TupleDesc tupleDesc, HeapTuple oldtup, + HeapTuple heaptup, HeapTuple wal_tup); + extern void heap_delta_decode(HeapTupleHeader htup, uint32 old_tup_len, + char *data, uint32 *new_tup_len, char *waldata, uint32 wal_len); + /* these three are deprecated versions of the three above: */ extern HeapTuple heap_formtuple(TupleDesc tupleDescriptor, Datum *values, char *nulls);