diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c index ccbcc66059..fbc56363c8 100644 --- a/src/backend/access/undo/undoinsert.c +++ b/src/backend/access/undo/undoinsert.c @@ -511,7 +511,7 @@ resize: Assert(AmAttachedToUndoLog(log) || InRecovery); /* - * We can consider that the log as switched if this is the first record of + * We can consider the log as switched if this is the first record of * the log and not the first record of the transaction i.e. same * transaction continued from the previous log. */ @@ -563,7 +563,7 @@ resize: * This is normally used when more than one undo record needs to be prepared. */ void -UndoSetPrepareSize(int nrecords, UnpackedUndoRecord *undorecords, +UndoSetPrepareSize(UnpackedUndoRecord *undorecords, int nrecords, TransactionId xid, UndoPersistence upersistence) { TransactionId txid; @@ -609,8 +609,8 @@ UndoSetPrepareSize(int nrecords, UnpackedUndoRecord *undorecords, * for the top most transactions. */ UndoRecPtr -PrepareUndoInsert(UnpackedUndoRecord *urec, UndoPersistence upersistence, - TransactionId xid) +PrepareUndoInsert(UnpackedUndoRecord *urec, TransactionId xid, + UndoPersistence upersistence) { UndoRecordSize size; UndoRecPtr urecptr; @@ -625,7 +625,7 @@ PrepareUndoInsert(UnpackedUndoRecord *urec, UndoPersistence upersistence, /* Already reached maximum prepared limit. */ if (prepare_idx == max_prepared_undo) - elog(ERROR, "Already reached the maximum prepared limit."); + elog(ERROR, "already reached the maximum prepared limit"); /* * If this is the first undo record for this top transaction add the @@ -637,7 +637,7 @@ PrepareUndoInsert(UnpackedUndoRecord *urec, UndoPersistence upersistence, */ if (xid == InvalidTransactionId) { - /* we expect during recovery, we always have a valid transaction id. */ + /* During recovery, we must have a valid transaction id. */ Assert(!InRecovery); txid = GetTopTransactionId(); } @@ -656,6 +656,7 @@ PrepareUndoInsert(UnpackedUndoRecord *urec, UndoPersistence upersistence, else urecptr = prepared_urec_ptr; + /* advance the prepared ptr location for next record. */ size = UndoRecordExpectedSize(urec); if (UndoRecPtrIsValid(prepared_urec_ptr)) { @@ -686,25 +687,23 @@ PrepareUndoInsert(UnpackedUndoRecord *urec, UndoPersistence upersistence, else cur_size += BLCKSZ - UndoLogBlockHeaderSize; - /* FIXME: Should we just report error ? */ + /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */ Assert(index < MAX_BUFFER_PER_UNDO); - /* Keep the track of the buffers we have pinned. */ + /* Keep the track of the buffers we have pinned and locked. */ prepared_undo[prepare_idx].undo_buffer_idx[index++] = bufidx; - /* Undo record can not fit into this block so go to the next block. */ - cur_blk++; - /* * If we need more pages they'll be all new so we can definitely skip * reading from disk. */ rbm = RBM_ZERO; + cur_blk++; } while (cur_size < size); /* - * Save referenced of undo record pointer as well as undo record. - * InsertPreparedUndo will use these to insert the prepared record. + * Save the undo record information to be later used by InsertPreparedUndo + * to insert the prepared record. */ prepared_undo[prepare_idx].urec = urec; prepared_undo[prepare_idx].urp = urecptr; @@ -754,11 +753,14 @@ InsertPreparedUndo(void) UnpackedUndoRecord *uur; UndoLogOffset offset; UndoLogControl *log; - uint16 prev_undolen; + /* There must be atleast one prepared undo record. */ Assert(prepare_idx > 0); - /* This must be called under a critical section. */ + /* + * This must be called under a critical section or we must be in + * recovery. + */ Assert(InRecovery || CritSectionCount > 0); for (idx = 0; idx < prepare_idx; idx++) @@ -771,16 +773,14 @@ InsertPreparedUndo(void) starting_byte = UndoRecPtrGetPageOffset(urp); offset = UndoRecPtrGetOffset(urp); - /* - * We can read meta.prevlen without locking, because only we can write - * to it. - */ log = UndoLogGet(UndoRecPtrGetLogNo(urp), false); Assert(AmAttachedToUndoLog(log) || InRecovery); - prev_undolen = log->meta.prevlen; - /* store the previous undo record length in the header */ - uur->uur_prevlen = prev_undolen; + /* + * Store the previous undo record length in the header. We can read + * meta.prevlen without locking, because only we can write to it. + */ + uur->uur_prevlen = log->meta.prevlen; /* if starting a new log then there is no prevlen to store */ if (offset == UndoLogBlockHeaderSize) @@ -811,7 +811,7 @@ InsertPreparedUndo(void) /* * Initialize the page whenever we try to write the first record - * in page. + * in page. We start writting immediately after the block header. */ if (starting_byte == UndoLogBlockHeaderSize) PageInit(page, BLCKSZ, 0); @@ -828,22 +828,25 @@ InsertPreparedUndo(void) } MarkBufferDirty(buffer); - starting_byte = UndoLogBlockHeaderSize; - bufidx++; /* * If we are swithing to the next block then consider the header * in total undo length. */ + starting_byte = UndoLogBlockHeaderSize; undo_len += UndoLogBlockHeaderSize; + bufidx++; + /* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */ Assert(bufidx < MAX_BUFFER_PER_UNDO); } while (true); - prev_undolen = undo_len; - - UndoLogSetPrevLen(UndoRecPtrGetLogNo(urp), prev_undolen); + UndoLogSetPrevLen(UndoRecPtrGetLogNo(urp), undo_len); + /* + * Link the transactions in the same log so that we can discard all + * the transaction's undo log in one-shot. + */ if (UndoRecPtrIsValid(xact_urec_info.urecptr)) UndoRecordUpdateTransInfo(); @@ -856,8 +859,8 @@ InsertPreparedUndo(void) } /* - * Reset the global variables related to undo buffers. This is required at the - * transaction abort or releasing undo buffers + * Reset the global variables related to undo buffers. This is required at the + * transaction abort and while releasing the undo buffers. */ void ResetUndoBuffers(void) @@ -879,7 +882,7 @@ ResetUndoBuffers(void) /* * max_prepared_undo limit is changed so free the allocated memory and - * reset all the variable back to its default value. + * reset all the variable back to their default value. */ if (max_prepared_undo > MAX_PREPARED_UNDO) { @@ -892,8 +895,8 @@ ResetUndoBuffers(void) } /* - * Unlock and release undo buffers. This step performed after exiting any - * critical section. + * Unlock and release the undo buffers. This step must be performed after + * exiting any critical section where we have perfomed undo actions. */ void UnlockReleaseUndoBuffers(void) @@ -909,10 +912,10 @@ UnlockReleaseUndoBuffers(void) /* * Helper function for UndoFetchRecord. It will fetch the undo record pointed * by urp and unpack the record into urec. This function will not release the - * pin on the buffer if complete record is fetched from one buffer, now caller + * pin on the buffer if complete record is fetched from one buffer, so caller * can reuse the same urec to fetch the another undo record which is on the * same block. Caller will be responsible to release the buffer inside urec - * and set it to invalid if he wishes to fetch the record from another block. + * and set it to invalid if it wishes to fetch the record from another block. */ static UnpackedUndoRecord * UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode, @@ -923,11 +926,11 @@ UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode, int starting_byte = UndoRecPtrGetPageOffset(urp); int already_decoded = 0; BlockNumber cur_blk; - bool is_undo_splited = false; + bool is_undo_rec_split = false; cur_blk = UndoRecPtrGetBlockNum(urp); - /* If we already have a previous buffer then no need to allocate new. */ + /* If we already have a buffer pin then no need to allocate a new one. */ if (!BufferIsValid(buffer)) { buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk, @@ -951,11 +954,11 @@ UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode, break; starting_byte = UndoLogBlockHeaderSize; - is_undo_splited = true; + is_undo_rec_split = true; /* - * Complete record is not fitting into one buffer so release the - * buffer pin and also set invalid buffer in the undo record. + * The record spans more than a page so we would have copied it (see + * UnpackUndoRecord). In such cases, we can release the buffer. */ urec->uur_buffer = InvalidBuffer; UnlockReleaseBuffer(buffer); @@ -968,10 +971,10 @@ UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode, } /* - * If we have copied the data then release the buffer. Otherwise just + * If we have copied the data then release the buffer, otherwise, just * unlock it. */ - if (is_undo_splited) + if (is_undo_rec_split) UnlockReleaseBuffer(buffer); else LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -981,29 +984,23 @@ UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode, /* * Fetch the next undo record for given blkno, offset and transaction id (if - * valid). We need to match transaction id along with block number and offset - * because in some cases (like reuse of slot for committed transaction), we - * need to skip the record if it is modified by a transaction later than the - * transaction indicated by previous undo record. For example, consider a - * case where tuple (ctid - 0,1) is modified by transaction id 500 which - * belongs to transaction slot 0. Then, the same tuple is modified by - * transaction id 501 which belongs to transaction slot 1. Then, both the - * transaction slots are marked for reuse. Then, again the same tuple is - * modified by transaction id 502 which has used slot 0. Now, some - * transaction which has started before transaction 500 wants to traverse the - * chain to find visible tuple will keep on rotating infinitely between undo - * tuple written by 502 and 501. In such a case, we need to skip the undo - * tuple written by transaction 502 when we want to find the undo record - * indicated by the previous pointer of undo tuple written by transaction 501. + * valid). The same tuple can be modified by multiple transactions, so during + * undo chain traversal sometimes we need to distinguish based on transaction + * id. Callers that don't have any such requirement can pass + * InvalidTransactionId. + * * Start the search from urp. Caller need to call UndoRecordRelease to release the * resources allocated by this function. * * urec_ptr_out is undo record pointer of the qualified undo record if valid * pointer is passed. + * + * callback function decides whether particular undo record satisfies the + * condition of caller. */ UnpackedUndoRecord * UndoFetchRecord(UndoRecPtr urp, BlockNumber blkno, OffsetNumber offset, - TransactionId xid, UndoRecPtr * urec_ptr_out, + TransactionId xid, UndoRecPtr *urec_ptr_out, SatisfyUndoRecordCallback callback) { RelFileNode rnode, diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h index 012285031c..4b0f1dd82f 100644 --- a/src/include/access/undoinsert.h +++ b/src/include/access/undoinsert.h @@ -39,8 +39,8 @@ typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord *urec, * undo log only stores mapping for the top most transactions. * If in recovery, 'xid' refers to the transaction id stored in WAL. */ -extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, UndoPersistence, - TransactionId xid); +extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, TransactionId xid, + UndoPersistence); /* * Insert a previously-prepared undo record. This will write the actual undo @@ -93,7 +93,7 @@ extern void UndoRecordSetPrevUndoLen(uint16 len); * be done before inserting the prepared undo. If size is > MAX_PREPARED_UNDO * then it will allocate extra memory to hold the extra prepared undo. */ -extern void UndoSetPrepareSize(int max_prepare, UnpackedUndoRecord *undorecords, +extern void UndoSetPrepareSize(UnpackedUndoRecord *undorecords, int nrecords, TransactionId xid, UndoPersistence upersistence); /* diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h index af967e84b4..9ca245509c 100644 --- a/src/include/access/undorecord.h +++ b/src/include/access/undorecord.h @@ -101,7 +101,7 @@ typedef struct UndoRecordBlock /* * Identifying information for a transaction to which this undo belongs. This - * also stores the dbid and the progress of the undo apply during rollback. + * also stores the dbid and the progress of the undo apply during rollback. */ typedef struct UndoRecordTransaction {