From bb3a1a0a51f427cc408b7891730309b4e0ef8629 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 5 Nov 2018 04:20:16 -0800 Subject: [PATCH] undoworker-transaction-rollback-v3 Dilip Kumar, with help from Rafia Sabih, Amit Kapila, Mithun.CY, Thomas Munro and Kuntal Ghosh --- src/backend/access/rmgrdesc/Makefile | 3 +- src/backend/access/rmgrdesc/undoactiondesc.c | 64 +++ src/backend/access/rmgrdesc/xlogdesc.c | 4 +- src/backend/access/transam/rmgr.c | 5 +- src/backend/access/transam/twophase.c | 45 +- src/backend/access/transam/varsup.c | 12 + src/backend/access/transam/xact.c | 441 ++++++++++++++++- src/backend/access/transam/xlog.c | 20 + src/backend/access/undo/Makefile | 2 +- src/backend/access/undo/undoaction.c | 683 ++++++++++++++++++++++++++ src/backend/access/undo/undoactionxlog.c | 49 ++ src/backend/access/undo/undodiscard.c | 458 +++++++++++++++++ src/backend/access/undo/undoinsert.c | 51 +- src/backend/access/undo/undorecord.c | 3 + src/backend/commands/vacuum.c | 11 + src/backend/postmaster/Makefile | 4 +- src/backend/postmaster/bgworker.c | 11 + src/backend/postmaster/discardworker.c | 170 +++++++ src/backend/postmaster/pgstat.c | 7 +- src/backend/postmaster/postmaster.c | 7 + src/backend/postmaster/undoworker.c | 664 +++++++++++++++++++++++++ src/backend/replication/logical/decode.c | 4 + src/backend/storage/ipc/ipci.c | 6 + src/backend/storage/lmgr/lwlocknames.txt | 2 + src/backend/storage/lmgr/proc.c | 2 + src/backend/utils/adt/lockfuncs.c | 1 + src/backend/utils/init/globals.c | 1 + src/backend/utils/misc/guc.c | 11 + src/backend/utils/misc/pg_controldata.c | 9 +- src/backend/utils/misc/postgresql.conf.sample | 7 + src/bin/pg_controldata/pg_controldata.c | 2 + src/bin/pg_resetwal/pg_resetwal.c | 7 + src/bin/pg_rewind/parsexlog.c | 2 +- src/bin/pg_waldump/rmgrdesc.c | 3 +- src/include/access/rmgr.h | 2 +- src/include/access/rmgrlist.h | 47 +- src/include/access/transam.h | 4 + src/include/access/twophase.h | 2 +- src/include/access/undoaction.h | 28 ++ src/include/access/undoaction_xlog.h | 74 +++ src/include/access/undodiscard.h | 31 ++ src/include/access/undoinsert.h | 2 + src/include/access/undorecord.h | 13 + src/include/access/xact.h | 1 + src/include/access/xlog.h | 3 + src/include/access/xlog_internal.h | 9 +- src/include/catalog/pg_control.h | 7 + src/include/miscadmin.h | 1 + src/include/pgstat.h | 3 + src/include/postmaster/discardworker.h | 25 + src/include/postmaster/undoloop.h | 89 ++++ src/include/postmaster/undoworker.h | 39 ++ src/include/storage/lock.h | 10 + src/include/storage/proc.h | 2 + 54 files changed, 3118 insertions(+), 45 deletions(-) create mode 100644 src/backend/access/rmgrdesc/undoactiondesc.c create mode 100644 src/backend/access/undo/undoaction.c create mode 100644 src/backend/access/undo/undoactionxlog.c create mode 100644 src/backend/access/undo/undodiscard.c create mode 100644 src/backend/postmaster/discardworker.c create mode 100644 src/backend/postmaster/undoworker.c create mode 100644 src/include/access/undoaction.h create mode 100644 src/include/access/undoaction_xlog.h create mode 100644 src/include/access/undodiscard.h create mode 100644 src/include/postmaster/discardworker.h create mode 100644 src/include/postmaster/undoloop.h create mode 100644 src/include/postmaster/undoworker.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index 91ad1ef..640d37f 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -11,6 +11,7 @@ include $(top_builddir)/src/Makefile.global OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o genericdesc.o \ gindesc.o gistdesc.o hashdesc.o heapdesc.o logicalmsgdesc.o \ mxactdesc.o nbtdesc.o relmapdesc.o replorigindesc.o seqdesc.o \ - smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undologdesc.o xactdesc.o xlogdesc.o + smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o undoactiondesc.o \ + undologdesc.o xactdesc.o xlogdesc.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/rmgrdesc/undoactiondesc.c b/src/backend/access/rmgrdesc/undoactiondesc.c new file mode 100644 index 0000000..89343b8 --- /dev/null +++ b/src/backend/access/rmgrdesc/undoactiondesc.c @@ -0,0 +1,64 @@ +/*------------------------------------------------------------------------- + * + * undoactiondesc.c + * rmgr descriptor routines for access/undo/undoactionxlog.c + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/undoactiondesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" + +void +undoaction_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_UNDO_PAGE) + { + uint8 *flags = (uint8 *) rec; + + appendStringInfo(buf, "page_contains_tpd_slot: %c ", + (*flags & XLU_PAGE_CONTAINS_TPD_SLOT) ? 'T' : 'F'); + appendStringInfo(buf, "is_page_initialized: %c ", + (*flags & XLU_INIT_PAGE) ? 'T' : 'F'); + if (*flags & XLU_PAGE_CONTAINS_TPD_SLOT) + { + xl_undoaction_page *xlrec = + (xl_undoaction_page *) ((char *) flags + sizeof(uint8)); + + appendStringInfo(buf, "urec_ptr %lu xid %u trans_slot_id %u", + xlrec->urec_ptr, xlrec->xid, xlrec->trans_slot_id); + } + } + else if (info == XLOG_UNDO_APPLY_PROGRESS) + { + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) rec; + + appendStringInfo(buf, "urec_ptr %lu progress %u", + xlrec->urec_ptr, xlrec->progress); + } +} + +const char * +undoaction_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_UNDO_APPLY_PROGRESS: + id = "UNDO APPLY PROGRESS"; + break; + } + + return id; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 00741c7..987e39c 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -47,7 +47,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) "tli %u; prev tli %u; fpw %s; xid %u:%u; oid %u; multi %u; offset %u; " "oldest xid %u in DB %u; oldest multi %u in DB %u; " "oldest/newest commit timestamp xid: %u/%u; " - "oldest running xid %u; %s", + "oldest running xid %u; " + "oldest xid with epoch having undo " UINT64_FORMAT "; %s", (uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo, checkpoint->ThisTimeLineID, checkpoint->PrevTimeLineID, @@ -63,6 +64,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record) checkpoint->oldestCommitTsXid, checkpoint->newestCommitTsXid, checkpoint->oldestActiveXid, + checkpoint->oldestXidWithEpochHavingUndo, (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online"); } else if (info == XLOG_NEXTOID) diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 8b05374..6238240 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -18,6 +18,7 @@ #include "access/multixact.h" #include "access/nbtxlog.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -31,8 +32,8 @@ #include "utils/relmapper.h" /* must be kept in sync with RmgrData definition in xlog_internal.h */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ - { name, redo, desc, identify, startup, cleanup, mask }, +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ + { name, redo, desc, identify, startup, cleanup, mask, undo, undo_desc }, const RmgrData RmgrTable[RM_MAX_ID + 1] = { #include "access/rmgrlist.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 3942734..83241bc 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -93,6 +93,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "postmaster/undoloop.h" #include "replication/origin.h" #include "replication/syncrep.h" #include "replication/walsender.h" @@ -915,6 +916,12 @@ typedef struct TwoPhaseFileHeader uint16 gidlen; /* length of the GID - GID follows the header */ XLogRecPtr origin_lsn; /* lsn of this record at origin node */ TimestampTz origin_timestamp; /* time of prepare at origin node */ + /* + * We need the locations of start and end undo record pointers when rollbacks + * are to be performed for prepared transactions using zheap relations. + */ + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; } TwoPhaseFileHeader; /* @@ -989,7 +996,8 @@ save_state_data(const void *data, uint32 len) * Initializes data structure and inserts the 2PC file header record. */ void -StartPrepare(GlobalTransaction gxact) +StartPrepare(GlobalTransaction gxact, UndoRecPtr *start_urec_ptr, + UndoRecPtr *end_urec_ptr) { PGPROC *proc = &ProcGlobal->allProcs[gxact->pgprocno]; PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; @@ -1027,6 +1035,10 @@ StartPrepare(GlobalTransaction gxact) &hdr.initfileinval); hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */ + /* save the start and end undo record pointers */ + memcpy(hdr.start_urec_ptr, start_urec_ptr, sizeof(hdr.start_urec_ptr)); + memcpy(hdr.end_urec_ptr, end_urec_ptr, sizeof(hdr.end_urec_ptr)); + save_state_data(&hdr, sizeof(TwoPhaseFileHeader)); save_state_data(gxact->gid, hdr.gidlen); @@ -1452,6 +1464,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RelFileNode *delrels; int ndelrels; SharedInvalidationMessage *invalmsgs; + int i; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; /* * Validate the GID, and lock the GXACT to ensure that two backends do not @@ -1489,6 +1504,34 @@ FinishPreparedTransaction(const char *gid, bool isCommit) invalmsgs = (SharedInvalidationMessage *) bufptr; bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); + /* save the start and end undo record pointers */ + memcpy(start_urec_ptr, hdr->start_urec_ptr, sizeof(start_urec_ptr)); + memcpy(end_urec_ptr, hdr->end_urec_ptr, sizeof(end_urec_ptr)); + + /* + * Perform undo actions, if there are undologs for this transaction. + * We need to perform undo actions while we are still in transaction. + * Never push rollbacks of temp tables to undo worker. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (end_urec_ptr[i] != InvalidUndoRecPtr && !isCommit) + { + bool result = false; + uint64 rollback_size = 0; + + if (i != UNDO_TEMP) + rollback_size = end_urec_ptr[i] - start_urec_ptr[i]; + + if (rollback_size >= rollback_overflow_size * 1024 * 1024) + result = PushRollbackReq(end_urec_ptr[i], start_urec_ptr[i], InvalidOid); + + if (!result) + execute_undo_actions(end_urec_ptr[i], start_urec_ptr[i], true, + true, false); + } + } + /* compute latestXid among all children */ latestXid = TransactionIdLatest(xid, hdr->nsubxacts, children); diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c index cede579..2274ab2 100644 --- a/src/backend/access/transam/varsup.c +++ b/src/backend/access/transam/varsup.c @@ -292,10 +292,22 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) TransactionId xidStopLimit; TransactionId xidWrapLimit; TransactionId curXid; + TransactionId oldestXidHavingUndo; Assert(TransactionIdIsNormal(oldest_datfrozenxid)); /* + * To determine the last safe xid that can be allocated, we need to + * consider oldestXidHavingUndo. The oldestXidHavingUndo will be only + * valid for zheap storage engine, so it won't impact any other storage + * engine. + */ + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + if (TransactionIdIsValid(oldestXidHavingUndo)) + oldest_datfrozenxid = Min(oldest_datfrozenxid, oldestXidHavingUndo); + + /* * The place where we actually get into deep trouble is halfway around * from the oldest potentially-existing XID. (This calculation is * probably off by one or two counts, because the special XIDs reduce the diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 6b7f7fa..80fb1fa 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -41,6 +41,7 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "postmaster/undoloop.h" #include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/origin.h" @@ -278,6 +279,20 @@ typedef struct SubXactCallbackItem static SubXactCallbackItem *SubXact_callbacks = NULL; +/* Location in undo log from where to start applying the undo actions. */ +static UndoRecPtr UndoActionStartPtr[UndoPersistenceLevels] = + {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + +/* Location in undo log up to which undo actions need to be applied. */ +static UndoRecPtr UndoActionEndPtr[UndoPersistenceLevels] = + {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + +/* Do we need to perform any undo actions? */ +static bool PerformUndoActions = false; /* local function prototypes */ static void AssignTransactionId(TransactionState s); @@ -1824,6 +1839,7 @@ StartTransaction(void) { TransactionState s; VirtualTransactionId vxid; + int i; /* * Let's just make sure the state stack is empty @@ -1884,6 +1900,13 @@ StartTransaction(void) nUnreportedXids = 0; s->didLogXid = false; + /* initialize undo record locations for the transaction */ + for(i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } + /* * must initialize resource-management stuff first */ @@ -2207,7 +2230,7 @@ CommitTransaction(void) * NB: if you change this routine, better look at CommitTransaction too! */ static void -PrepareTransaction(void) +PrepareTransaction(UndoRecPtr *start_urec_ptr, UndoRecPtr *end_urec_ptr) { TransactionState s = CurrentTransactionState; TransactionId xid = GetCurrentTransactionId(); @@ -2355,7 +2378,7 @@ PrepareTransaction(void) * PREPARED; in particular, pay attention to whether things should happen * before or after releasing the transaction's locks. */ - StartPrepare(gxact); + StartPrepare(gxact, start_urec_ptr, end_urec_ptr); AtPrepare_Notify(); AtPrepare_Locks(); @@ -2478,6 +2501,65 @@ PrepareTransaction(void) RESUME_INTERRUPTS(); } +static void +AtAbort_Rollback(void) +{ + TransactionState s = CurrentTransactionState; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + int i; + + /* XXX: TODO: check this logic, which was moved out of UserAbortTransactionBlock */ + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + + /* + * Remember the required information for performing undo actions. So that + * if there is any failure in executing the undo action we can execute + * it later. + */ + memcpy (UndoActionStartPtr, latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy (UndoActionEndPtr, s->start_urec_ptr, sizeof(UndoActionEndPtr)); + + /* + * If we are in a valid transaction state then execute the undo action here + * itself, otherwise we have already stored the required information for + * executing the undo action later. + */ + if (CurrentTransactionState->state == TRANS_INPROGRESS) + { + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (latest_urec_ptr[i]) + { + if (i == UNDO_TEMP) + execute_undo_actions(UndoActionStartPtr[i], UndoActionEndPtr[i], + false, true, true); + else + { + uint64 size = latest_urec_ptr[i] - s->start_urec_ptr[i]; + bool result = false; + + /* + * If this is a large rollback request then push it to undo-worker + * through RollbackHT, undo-worker will perform it's undo actions + * later. + */ + if (size >= rollback_overflow_size * 1024 * 1024) + result = PushRollbackReq(UndoActionStartPtr[i], UndoActionEndPtr[i], InvalidOid); + + if (!result) + { + execute_undo_actions(UndoActionStartPtr[i], UndoActionEndPtr[i], + true, true, true); + UndoActionStartPtr[i] = InvalidUndoRecPtr; + } + } + } + } + } + else + PerformUndoActions = true; +} /* * AbortTransaction @@ -2579,6 +2661,7 @@ AbortTransaction(void) */ AfterTriggerEndXact(false); /* 'false' means it's abort */ AtAbort_Portals(); + AtAbort_Rollback(); AtEOXact_LargeObject(false); AtAbort_Notify(); AtEOXact_RelationMap(false, is_parallel_worker); @@ -2787,6 +2870,12 @@ void CommitTransactionCommand(void) { TransactionState s = CurrentTransactionState; + UndoRecPtr end_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i; + + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); + memcpy(end_urec_ptr, s->latest_urec_ptr, sizeof(end_urec_ptr)); switch (s->blockState) { @@ -2876,7 +2965,7 @@ CommitTransactionCommand(void) * return to the idle state. */ case TBLOCK_PREPARE: - PrepareTransaction(); + PrepareTransaction(start_urec_ptr, end_urec_ptr); s->blockState = TBLOCK_DEFAULT; break; @@ -2922,6 +3011,23 @@ CommitTransactionCommand(void) { CommitSubTransaction(); s = CurrentTransactionState; /* changed by pop */ + + /* + * Update the end undo record pointer if it's not valid with + * the currently popped transaction's end undo record pointer. + * This is particularly required when the first command of + * the transaction is of type which does not require an undo, + * e.g. savepoint x. + * Accordingly, update the start undo record pointer. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(end_urec_ptr[i])) + end_urec_ptr[i] = s->latest_urec_ptr[i]; + + if (UndoRecPtrIsValid(s->start_urec_ptr[i])) + start_urec_ptr[i] = s->start_urec_ptr[i]; + } } while (s->blockState == TBLOCK_SUBCOMMIT); /* If we had a COMMIT command, finish off the main xact too */ if (s->blockState == TBLOCK_END) @@ -2933,7 +3039,7 @@ CommitTransactionCommand(void) else if (s->blockState == TBLOCK_PREPARE) { Assert(s->parent == NULL); - PrepareTransaction(); + PrepareTransaction(start_urec_ptr, end_urec_ptr); s->blockState = TBLOCK_DEFAULT; } else @@ -3027,6 +3133,18 @@ void AbortCurrentTransaction(void) { TransactionState s = CurrentTransactionState; + int i; + + /* + * The undo actions are allowed to be executed at the end of statement + * execution when we are not in transaction block, otherwise they are + * executed when user explicitly ends the transaction. + * + * So if we are in a transaction block don't set the PerformUndoActions + * because this flag will be set when user explicitly issue rollback or + * rollback to savepoint. + */ + PerformUndoActions = false; switch (s->blockState) { @@ -3061,6 +3179,16 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* + * We are outside the transaction block so remember the required + * information to perform undo actions and also set the + * PerformUndoActions so that we execute it before completing this + * command. + */ + PerformUndoActions = true; + memcpy (UndoActionStartPtr, s->latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy (UndoActionEndPtr, s->start_urec_ptr, sizeof(UndoActionEndPtr)); break; /* @@ -3097,6 +3225,9 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* Failed during commit, so we need to perform the undo actions. */ + PerformUndoActions = true; break; /* @@ -3116,6 +3247,9 @@ AbortCurrentTransaction(void) case TBLOCK_ABORT_END: CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* Failed during commit, so we need to perform the undo actions. */ + PerformUndoActions = true; break; /* @@ -3126,6 +3260,12 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* + * Failed while executing the rollback command, need perform any + * pending undo actions. + */ + PerformUndoActions = true; break; /* @@ -3137,6 +3277,12 @@ AbortCurrentTransaction(void) AbortTransaction(); CleanupTransaction(); s->blockState = TBLOCK_DEFAULT; + + /* + * Perform any pending actions if failed while preparing the + * transaction. + */ + PerformUndoActions = true; break; /* @@ -3159,6 +3305,17 @@ AbortCurrentTransaction(void) case TBLOCK_SUBCOMMIT: case TBLOCK_SUBABORT_PENDING: case TBLOCK_SUBRESTART: + /* + * If we are here and still UndoActionStartPtr is valid that means + * the subtransaction failed while executing the undo action, so + * store its undo action start point in parent so that parent can + * start its undo action from this point. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(UndoActionStartPtr[i])) + s->parent->latest_urec_ptr[i] = UndoActionStartPtr[i]; + } AbortSubTransaction(); CleanupSubTransaction(); AbortCurrentTransaction(); @@ -3176,6 +3333,109 @@ AbortCurrentTransaction(void) } /* + * XactPerformUndoActionsIfPending - Execute pending undo actions. + * + * If the parent transaction state is valid (when there is an error in the + * subtransaction and rollback to savepoint is executed), then allow to + * perform undo actions in it, otherwise perform them in a new transaction. + */ +void +XactPerformUndoActionsIfPending() +{ + TransactionState s = CurrentTransactionState; + uint64 rollback_size = 0; + bool new_xact = true, result = false, no_pending_action = true; + UndoRecPtr parent_latest_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + if (!PerformUndoActions) + return; + + /* If there is no undo log for any persistence level, then return. */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(UndoActionStartPtr[i])) + { + no_pending_action = false; + break; + } + } + + if (no_pending_action) + { + PerformUndoActions = false; + return; + } + + /* + * Execute undo actions under parent transaction, if any. Otherwise start + * a new transaction. + */ + if (GetTopTransactionIdIfAny() != InvalidTransactionId) + { + memcpy(parent_latest_urec_ptr, s->latest_urec_ptr, + sizeof (parent_latest_urec_ptr)); + new_xact = false; + } + + /* + * If this is a large rollback request then push it to undo-worker + * through RollbackHT, undo-worker will perform it's undo actions later. + * Never push the rollbacks for temp tables. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(UndoActionStartPtr[i])) + continue; + + if (i == UNDO_TEMP) + goto perform_rollback; + else + rollback_size = UndoActionStartPtr[i] - UndoActionEndPtr[i]; + + if (new_xact && rollback_size > rollback_overflow_size * 1024 * 1024) + result = PushRollbackReq(UndoActionStartPtr[i], UndoActionEndPtr[i], InvalidOid); + + if (!result) + { +perform_rollback: + if (new_xact) + { + TransactionState xact; + + /* Start a new transaction for performing the rollback */ + StartTransactionCommand(); + xact = CurrentTransactionState; + + /* + * Store the previous transactions start and end undo record + * pointers into this transaction's state so that if there is + * some error while performing undo actions we can restart + * from begining. + */ + memcpy(xact->start_urec_ptr, UndoActionEndPtr, + sizeof(UndoActionEndPtr)); + memcpy(xact->latest_urec_ptr, UndoActionStartPtr, + sizeof(UndoActionStartPtr)); + } + + execute_undo_actions(UndoActionStartPtr[i], UndoActionEndPtr[i], + new_xact, true, true); + + if (new_xact) + CommitTransactionCommand(); + else + { + /* Restore parent's state. */ + s->latest_urec_ptr[i] = parent_latest_urec_ptr[i]; + } + } + } + + PerformUndoActions = false; +} + +/* * PreventInTransactionBlock * * This routine is to be called by statements that must not run inside @@ -3576,6 +3836,10 @@ EndTransactionBlock(void) { TransactionState s = CurrentTransactionState; bool result = false; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + int i ; + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); switch (s->blockState) { @@ -3621,6 +3885,16 @@ EndTransactionBlock(void) elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); s = s->parent; + + /* + * We are calculating latest_urec_ptr, even though its a commit + * case. This is to handle any error during the commit path. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = s->latest_urec_ptr[i]; + } } if (s->blockState == TBLOCK_INPROGRESS) s->blockState = TBLOCK_END; @@ -3646,6 +3920,11 @@ EndTransactionBlock(void) elog(FATAL, "EndTransactionBlock: unexpected state %s", BlockStateAsString(s->blockState)); s = s->parent; + for (i = 0; i < UndoPersistenceLevels; i++) + { + if(!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = s->latest_urec_ptr[i]; + } } if (s->blockState == TBLOCK_INPROGRESS) s->blockState = TBLOCK_ABORT_PENDING; @@ -3698,6 +3977,18 @@ EndTransactionBlock(void) break; } + /* + * We need to perform undo actions if the transaction is failed. Remember + * the required information to perform undo actions at the end of + * statement execution. + */ + if (!result) + PerformUndoActions = true; + + memcpy(UndoActionStartPtr, latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy(UndoActionEndPtr, TopTransactionStateData.start_urec_ptr, + sizeof(UndoActionEndPtr)); + return result; } @@ -3955,6 +4246,12 @@ ReleaseSavepoint(const char *name) TransactionState s = CurrentTransactionState; TransactionState target, xact; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); /* * Workers synchronize transaction state at the beginning of each parallel @@ -4048,8 +4345,32 @@ ReleaseSavepoint(const char *name) if (xact == target) break; xact = xact->parent; + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (!UndoRecPtrIsValid(latest_urec_ptr[i])) + latest_urec_ptr[i] = xact->latest_urec_ptr[i]; + + if (UndoRecPtrIsValid(xact->start_urec_ptr[i])) + start_urec_ptr[i] = xact->start_urec_ptr[i]; + } + Assert(PointerIsValid(xact)); } + + /* + * Before cleaning up the current sub transaction state, overwrite parent + * transaction's latest_urec_ptr with current transaction's latest_urec_ptr + * so that in case parent transaction get aborted we will not skip + * performing undo for this transaction. Also set the start_urec_ptr if + * parent start_urec_ptr is not valid. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + xact->parent->latest_urec_ptr[i] = latest_urec_ptr[i]; + if (!UndoRecPtrIsValid(xact->parent->start_urec_ptr[i])) + xact->parent->start_urec_ptr[i] = start_urec_ptr[i]; + } } /* @@ -4260,6 +4581,7 @@ void ReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + int i; /* * Workers synchronize transaction state at the beginning of each parallel @@ -4278,6 +4600,22 @@ ReleaseCurrentSubTransaction(void) BlockStateAsString(s->blockState)); Assert(s->state == TRANS_INPROGRESS); MemoryContextSwitchTo(CurTransactionContext); + + /* + * Before cleaning up the current sub transaction state, overwrite parent + * transaction's latest_urec_ptr with current transaction's latest_urec_ptr + * so that in case parent transaction get aborted we will not skip + * performing undo for this transaction. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(s->latest_urec_ptr[i])) + s->parent->latest_urec_ptr[i] = s->latest_urec_ptr[i]; + + if (!UndoRecPtrIsValid(s->parent->start_urec_ptr[i])) + s->parent->start_urec_ptr[i] = s->start_urec_ptr[i]; + } + CommitSubTransaction(); s = CurrentTransactionState; /* changed by pop */ Assert(s->state == TRANS_INPROGRESS); @@ -4294,6 +4632,14 @@ void RollbackAndReleaseCurrentSubTransaction(void) { TransactionState s = CurrentTransactionState; + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels] = {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels] = {InvalidUndoRecPtr, + InvalidUndoRecPtr, + InvalidUndoRecPtr}; + UndoRecPtr parent_latest_urec_ptr[UndoPersistenceLevels]; + int i; /* * Unlike ReleaseCurrentSubTransaction(), this is nominally permitted @@ -4340,6 +4686,19 @@ RollbackAndReleaseCurrentSubTransaction(void) if (s->blockState == TBLOCK_SUBINPROGRESS) AbortSubTransaction(); + /* + * Remember the required information to perform undo actions before + * cleaning up the subtransaction state. + */ + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(s->latest_urec_ptr[i])) + { + latest_urec_ptr[i] = s->latest_urec_ptr[i]; + start_urec_ptr[i] = s->start_urec_ptr[i]; + } + } + /* And clean it up, too */ CleanupSubTransaction(); @@ -4348,6 +4707,30 @@ RollbackAndReleaseCurrentSubTransaction(void) s->blockState == TBLOCK_INPROGRESS || s->blockState == TBLOCK_IMPLICIT_INPROGRESS || s->blockState == TBLOCK_STARTED); + + for (i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + { + parent_latest_urec_ptr[i] = s->latest_urec_ptr[i]; + + /* + * Store the undo action start point in the parent state so that + * we can apply undo actions these undos also during rollback of + * parent transaction in case of error while applying the undo + * actions. + */ + s->latest_urec_ptr[i] = latest_urec_ptr[i]; + execute_undo_actions(latest_urec_ptr[i], start_urec_ptr[i], false, + true, true); + + /* Restore parent state. */ + s->latest_urec_ptr[i] = parent_latest_urec_ptr[i]; + } + } + + /* Successfully performed undo actions so reset the flag. */ + PerformUndoActions = false; } /* @@ -4561,6 +4944,7 @@ static void StartSubTransaction(void) { TransactionState s = CurrentTransactionState; + int i; if (s->state != TRANS_DEFAULT) elog(WARNING, "StartSubTransaction while in %s state", @@ -4578,6 +4962,13 @@ StartSubTransaction(void) AtSubStart_Notify(); AfterTriggerBeginSubXact(); + /* initialize undo record locations for the transaction */ + for(i = 0; i < UndoPersistenceLevels; i++) + { + s->start_urec_ptr[i] = InvalidUndoRecPtr; + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + } + s->state = TRANS_INPROGRESS; /* @@ -4701,6 +5092,47 @@ CommitSubTransaction(void) PopTransaction(); } +static void +AtSubAbort_Rollback(TransactionState s) +{ + UndoRecPtr latest_urec_ptr[UndoPersistenceLevels]; + UndoRecPtr start_urec_ptr[UndoPersistenceLevels]; + int i = 0; + + /* XXX: TODO: Check this logic, which was moved out of RollbackToSavepoint() */ + + memcpy(latest_urec_ptr, s->latest_urec_ptr, sizeof(latest_urec_ptr)); + memcpy(start_urec_ptr, s->start_urec_ptr, sizeof(start_urec_ptr)); + + /* + * Remember the required information for performing undo actions. So that + * if there is any failure in executing the undo action we can execute + * it later. + */ + memcpy (UndoActionStartPtr, latest_urec_ptr, sizeof(UndoActionStartPtr)); + memcpy (UndoActionEndPtr, start_urec_ptr, sizeof(UndoActionEndPtr)); + + /* + * If we are in a valid transaction state then execute the undo action here + * itself, otherwise we have already stored the required information for + * executing the undo action later. + */ + if (s->state == TRANS_INPROGRESS) + { + for ( i = 0; i < UndoPersistenceLevels; i++) + { + if (UndoRecPtrIsValid(latest_urec_ptr[i])) + { + execute_undo_actions(latest_urec_ptr[i], start_urec_ptr[i], false, true, false); + s->latest_urec_ptr[i] = InvalidUndoRecPtr; + UndoActionStartPtr[i] = InvalidUndoRecPtr; + } + } + } + else + PerformUndoActions = true; +} + /* * AbortSubTransaction */ @@ -4795,6 +5227,7 @@ AbortSubTransaction(void) s->parent->subTransactionId, s->curTransactionOwner, s->parent->curTransactionOwner); + AtSubAbort_Rollback(s); AtEOSubXact_LargeObject(false, s->subTransactionId, s->parent->subTransactionId); AtSubAbort_Notify(); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 23f23e7..cdc085f 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -5172,6 +5172,7 @@ BootStrapXLOG(void) checkPoint.newestCommitTsXid = InvalidTransactionId; checkPoint.time = (pg_time_t) time(NULL); checkPoint.oldestActiveXid = InvalidTransactionId; + checkPoint.oldestXidWithEpochHavingUndo = InvalidTransactionId; ShmemVariableCache->nextXid = checkPoint.nextXid; ShmemVariableCache->nextOid = checkPoint.nextOid; @@ -6792,6 +6793,10 @@ StartupXLOG(void) (errmsg_internal("commit timestamp Xid oldest/newest: %u/%u", checkPoint.oldestCommitTsXid, checkPoint.newestCommitTsXid))); + ereport(DEBUG1, + (errmsg_internal("oldest xid with epoch having undo: " UINT64_FORMAT, + checkPoint.oldestXidWithEpochHavingUndo))); + if (!TransactionIdIsNormal(checkPoint.nextXid)) ereport(PANIC, (errmsg("invalid next transaction ID"))); @@ -6809,6 +6814,10 @@ StartupXLOG(void) XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch; XLogCtl->ckptXid = checkPoint.nextXid; + /* Read oldest xid having undo from checkpoint and set in proc global. */ + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * Initialize replication slots, before there's a chance to remove * required resources. @@ -8979,6 +8988,9 @@ CreateCheckPoint(int flags) checkPoint.nextOid += ShmemVariableCache->oidCount; LWLockRelease(OidGenLock); + checkPoint.oldestXidWithEpochHavingUndo = + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo); + MultiXactGetCheckptMulti(shutdown, &checkPoint.nextMulti, &checkPoint.nextMultiOffset, @@ -9889,6 +9901,9 @@ xlog_redo(XLogReaderState *record) MultiXactAdvanceOldest(checkPoint.oldestMulti, checkPoint.oldestMultiDB); + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * No need to set oldestClogXid here as well; it'll be set when we * redo an xl_clog_truncate if it changed since initialization. @@ -9947,6 +9962,8 @@ xlog_redo(XLogReaderState *record) /* ControlFile->checkPointCopy always tracks the latest ckpt XID */ ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch; ControlFile->checkPointCopy.nextXid = checkPoint.nextXid; + ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo = + checkPoint.oldestXidWithEpochHavingUndo; /* Update shared-memory copy of checkpoint XID/epoch */ SpinLockAcquire(&XLogCtl->info_lck); @@ -9996,6 +10013,9 @@ xlog_redo(XLogReaderState *record) MultiXactAdvanceNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset); + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + checkPoint.oldestXidWithEpochHavingUndo); + /* * NB: This may perform multixact truncation when replaying WAL * generated by an older primary. diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile index f41e8f7..fdf7f7d 100644 --- a/src/backend/access/undo/Makefile +++ b/src/backend/access/undo/Makefile @@ -12,6 +12,6 @@ subdir = src/backend/access/undo top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = undoinsert.o undolog.o undorecord.o +OBJS = undoaction.o undoactionxlog.o undodiscard.o undoinsert.o undolog.o undorecord.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/undo/undoaction.c b/src/backend/access/undo/undoaction.c new file mode 100644 index 0000000..a7bf506 --- /dev/null +++ b/src/backend/access/undo/undoaction.c @@ -0,0 +1,683 @@ +/*------------------------------------------------------------------------- + * + * undoaction.c + * execute undo actions + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undoaction.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/undoaction.h" +#include "access/undoaction_xlog.h" +#include "access/undolog.h" +#include "access/undorecord.h" +#include "access/visibilitymap.h" +#include "access/xact.h" +#include "access/xloginsert.h" +#include "access/xlog_internal.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "postmaster/undoloop.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "utils/rel.h" +#include "utils/relfilenodemap.h" +#include "miscadmin.h" +#include "storage/shmem.h" +#include "access/undodiscard.h" + +#define ROLLBACK_HT_SIZE 1024 + +static bool execute_undo_actions_page(List *luinfo, UndoRecPtr urec_ptr, + Oid reloid, TransactionId xid, BlockNumber blkno, + bool blk_chain_complete, bool norellock, int options); +static void RollbackHTRemoveEntry(UndoRecPtr start_urec_ptr); + +/* This is the hash table to store all the rollabck requests. */ +static HTAB *RollbackHT; + +/* + * execute_undo_actions - Execute the undo actions + * + * from_urecptr - undo record pointer from where to start applying undo action. + * to_urecptr - undo record pointer upto which point apply undo action. + * nopartial - true if rollback is for complete transaction. + * rewind - whether to rewind the insert location of the undo log or not. + * Only the backend executed the transaction can rewind, but + * any other process e.g. undo worker should not rewind it. + * Because, if the backend have already inserted new undo records + * for the next transaction and if we rewind then we will loose + * the undo record inserted for the new transaction. + * rellock - if the caller already has the lock on the required relation, + * then this flag is false, i.e. we do not need to acquire any + * lock here. If the flag is true then we need to acquire lock + * here itself, because caller will not be having any lock. + * When we are performing undo actions for prepared transactions, + * or for rollback to savepoint, we need not to lock as we already + * have the lock on the table. In cases like error or when + * rollbacking from the undo worker we need to have proper locks. + */ +void +execute_undo_actions(UndoRecPtr from_urecptr, UndoRecPtr to_urecptr, + bool nopartial, bool rewind, bool rellock) +{ + UnpackedUndoRecord *uur = NULL; + UndoRecPtr urec_ptr, prev_urec_ptr; + UndoRecPtr save_urec_ptr; + Oid prev_reloid = InvalidOid; + ForkNumber prev_fork = InvalidForkNumber; + BlockNumber prev_block = InvalidBlockNumber; + List *luinfo = NIL; + bool more_undo; + int options = 0; + TransactionId xid = InvalidTransactionId; + UndoRecInfo *urec_info; + + Assert(from_urecptr != InvalidUndoRecPtr); + Assert(UndoRecPtrGetLogNo(from_urecptr) != UndoRecPtrGetLogNo(to_urecptr) || + from_urecptr >= to_urecptr); + /* + * If the location upto which rollback need to be done is not provided, + * then rollback the complete transaction. + * FIXME: this won't work if undolog crossed the limit of 1TB, because + * then from_urecptr and to_urecptr will be from different lognos. + */ + if (to_urecptr == InvalidUndoRecPtr) + { + UndoLogNumber logno = UndoRecPtrGetLogNo(from_urecptr); + to_urecptr = UndoLogGetLastXactStartPoint(logno); + } + + save_urec_ptr = urec_ptr = from_urecptr; + + if (nopartial) + { + uur = UndoFetchRecord(urec_ptr, InvalidBlockNumber, InvalidOffsetNumber, + InvalidTransactionId, NULL, NULL); + if (uur == NULL) + return; + + xid = uur->uur_xid; + UndoRecordRelease(uur); + uur = NULL; + + /* + * Grab the undo action apply lock before start applying the undo action + * this will prevent applying undo actions concurrently. If we do not + * get the lock that mean its already being applied concurrently or the + * discard worker might be pushing its request to the rollback hash + * table + */ + if (!ConditionTransactionUndoActionLock(xid)) + return; + } + + prev_urec_ptr = InvalidUndoRecPtr; + while (prev_urec_ptr != to_urecptr) + { + Oid reloid = InvalidOid; + uint16 urec_prevlen; + bool non_page; + + more_undo = true; + + prev_urec_ptr = urec_ptr; + + /* Fetch the undo record for given undo_recptr. */ + uur = UndoFetchRecord(urec_ptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, NULL, NULL); + + /* If there is no info block, this is not a page-based undo record. */ + non_page = uur && !(uur->uur_info & UREC_INFO_BLOCK); + + if (uur != NULL && !non_page) + reloid = RelidByRelfilenode(uur->uur_tsid, uur->uur_relfilenode); + + xid = uur->uur_xid; + + if (non_page) + { + prev_reloid = InvalidOid; + urec_prevlen = uur->uur_prevlen; + save_urec_ptr = uur->uur_blkprev; + + /* + * Execute individual undo actions not associated with a page + * immediately. + */ + urec_info = palloc(sizeof(UndoRecInfo)); + urec_info->uur = uur; + urec_info->urp = urec_ptr; + luinfo = lappend(luinfo, urec_info); + execute_undo_actions_page(luinfo, urec_ptr, reloid, xid, + InvalidBlockNumber, false, rellock, + 0); + pfree(urec_info); + urec_info = NULL; + list_free(luinfo); + luinfo = NIL; + UndoRecordRelease(uur); + + /* Follow undo chain until to_urecptr. */ + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + { + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + continue; + } + else + more_undo = false; + } + /* + * If the record is already discarded by undo worker or if the relation + * is dropped or truncated, then we cannot fetch record successfully. + * Hence, skip quietly. + * + * Note: reloid remains InvalidOid for a discarded record. + */ + else if (!OidIsValid(reloid)) + { + /* release the undo records for which action has been replayed */ + while (luinfo) + { + UndoRecInfo *urec_info = (UndoRecInfo *) linitial(luinfo); + + UndoRecordRelease(urec_info->uur); + pfree(urec_info); + luinfo = list_delete_first(luinfo); + } + + urec_prevlen = uur->uur_prevlen; + + /* Release the just-fetched record */ + if (uur != NULL) + UndoRecordRelease(uur); + + /* The undo chain must continue till we reach to_urecptr */ + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + { + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + continue; + } + else + more_undo = false; + } + else if (!OidIsValid(prev_reloid) || + (prev_reloid == reloid && + prev_fork == uur->uur_fork && + prev_block == uur->uur_block)) + { + /* Collect the undo records that belong to the same page. */ + prev_reloid = reloid; + prev_fork = uur->uur_fork; + prev_block = uur->uur_block; + + /* Prepare an undo record information element. */ + urec_info = palloc(sizeof(UndoRecInfo)); + urec_info->urp = urec_ptr; + urec_info->uur = uur; + + luinfo = lappend(luinfo, urec_info); + urec_prevlen = uur->uur_prevlen; + save_urec_ptr = uur->uur_blkprev; + + if (uur->uur_info & UREC_INFO_PAYLOAD_CONTAINS_SLOT) + options |= UNDO_ACTION_UPDATE_TPD; + + /* The undo chain must continue till we reach to_urecptr */ + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + { + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + continue; + } + else + more_undo = false; + } + else + { + more_undo = true; + } + + /* + * If no more undo is left to be processed and we are rolling back the + * complete transaction, then we can consider that the undo chain for a + * block is complete. + * If the previous undo pointer in the page is invalid, then also the + * undo chain for the current block is completed. + */ + if (luinfo && + ((!more_undo && nopartial) || !UndoRecPtrIsValid(save_urec_ptr))) + { + execute_undo_actions_page(luinfo, save_urec_ptr, prev_reloid, + xid, prev_block, true, rellock, options); + /* Done with the page so reset the options. */ + options = 0; + } + else if (luinfo) + { + execute_undo_actions_page(luinfo, save_urec_ptr, prev_reloid, + xid, prev_block, false, rellock, options); + /* Done with the page so reset the options. */ + options = 0; + } + + /* release the undo records for which action has been replayed */ + while (luinfo) + { + UndoRecInfo *urec_info = (UndoRecInfo *) linitial(luinfo); + + UndoRecordRelease(urec_info->uur); + pfree(urec_info); + luinfo = list_delete_first(luinfo); + } + + /* + * There are still more records to process, so keep moving backwards + * in the chain. + */ + if (more_undo) + { + /* Prepare an undo record information element. */ + urec_info = palloc(sizeof(UndoRecInfo)); + urec_info->urp = urec_ptr; + urec_info->uur = uur; + luinfo = lappend(luinfo, urec_info); + + prev_reloid = reloid; + prev_fork = uur->uur_fork; + prev_block = uur->uur_block; + save_urec_ptr = uur->uur_blkprev; + + if (uur->uur_info & UREC_INFO_PAYLOAD_CONTAINS_SLOT) + options |= UNDO_ACTION_UPDATE_TPD; + + /* + * Continue to process the records if this is not the last undo + * record in chain. + */ + urec_prevlen = uur->uur_prevlen; + if (urec_prevlen > 0 && urec_ptr != to_urecptr) + urec_ptr = UndoGetPrevUndoRecptr(urec_ptr, urec_prevlen); + else + break; + } + else + break; + } + + /* Apply the undo actions for the remaining records. */ + if (list_length(luinfo)) + { + execute_undo_actions_page(luinfo, save_urec_ptr, prev_reloid, + xid, prev_block, nopartial ? true : false, + rellock, options); + + /* release the undo records for which action has been replayed */ + while (luinfo) + { + UndoRecInfo *urec_info = (UndoRecInfo *) linitial(luinfo); + + UndoRecordRelease(urec_info->uur); + pfree(urec_info); + luinfo = list_delete_first(luinfo); + } + } + + if (rewind) + { + /* Read the prevlen from the first record of this transaction. */ + uur = UndoFetchRecord(to_urecptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + /* + * If undo is already discarded before we rewind, then do nothing. + */ + if (uur == NULL) + return; + + + /* + * Rewind the insert location to start of this transaction. This is + * to avoid reapplying some intermediate undo. We do not need to wal + * log this information here, because if the system crash before we + * rewind the insert pointer then after recovery we can identify + * whether the undo is already applied or not from the slot undo record + * pointer. Also set the correct prevlen value (what we have fetched + * from the undo). + */ + UndoLogRewind(to_urecptr, uur->uur_prevlen); + + UndoRecordRelease(uur); + } + + if (nopartial) + { + /* + * Set undo action apply completed in the transaction header if this is + * a main transaction and we have not rewound its undo. + */ + if (!rewind) + { + /* + * Undo action is applied so delete the hash table entry and release + * the undo action lock. + */ + RollbackHTRemoveEntry(from_urecptr); + + /* + * Prepare and update the progress of the undo action apply in the + * transaction header. + */ + PrepareUpdateUndoActionProgress(to_urecptr, 1); + + START_CRIT_SECTION(); + + /* Update the progress in the transaction header. */ + UndoRecordUpdateTransInfo(); + + /* WAL log the undo apply progress. */ + { + xl_undoapply_progress xlrec; + + xlrec.urec_ptr = to_urecptr; + xlrec.progress = 1; + + /* + * FIXME : We need to register undo buffers and set LSN for them + * that will be required for FPW of the undo buffers. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + + (void) XLogInsert(RM_UNDOACTION_ID, XLOG_UNDO_APPLY_PROGRESS); + } + + END_CRIT_SECTION(); + UnlockReleaseUndoBuffers(); + } + + TransactionUndoActionLockRelease(xid); + } +} + +/* + * execute_undo_actions_page - Execute the undo actions for a page + * + * After applying all the undo actions for a page, we clear the transaction + * slot on a page if the undo chain for block is complete, otherwise just + * rewind the undo pointer to the last record for that block that precedes + * the last undo record for which action is replayed. + * + * luinfo - list of undo records (along with their location) for which undo + * action needs to be replayed. + * urec_ptr - undo record pointer to which we need to rewind. + * reloid - OID of relation on which undo actions needs to be applied. + * blkno - block number on which undo actions needs to be applied. + * blk_chain_complete - indicates whether the undo chain for block is + * complete. + * nopartial - true if rollback is for complete transaction. If we are not + * rolling back the complete transaction then we need to apply the + * undo action for UNDO_INVALID_XACT_SLOT also because in such + * case we will rewind the insert undo location. + * rellock - if the caller already has the lock on the required relation, + * then this flag is false, i.e. we do not need to acquire any + * lock here. If the flag is true then we need to acquire lock + * here itself, because caller will not be having any lock. + * When we are performing undo actions for prepared transactions, + * or for rollback to savepoint, we need not to lock as we already + * have the lock on the table. In cases like error or when + * rollbacking from the undo worker we need to have proper locks. + * options - options for executing undo actions. + * + * returns true, if successfully applied the undo actions, otherwise, false. + */ +static bool +execute_undo_actions_page(List *luinfo, UndoRecPtr urec_ptr, Oid reloid, + TransactionId xid, BlockNumber blkno, + bool blk_chain_complete, bool rellock, int options) +{ + UndoRecInfo *first; + + /* + * All records passed to us are for the same RMGR, so we just use the + * first record to dispatch. + */ + Assert(luinfo != NIL); + first = (UndoRecInfo *) linitial(luinfo); + + return RmgrTable[first->uur->uur_rmid].rm_undo(luinfo, urec_ptr, reloid, + xid, blkno, + blk_chain_complete, rellock, + options); +} + +/* + * To return the size of the hash-table for rollbacks. + */ +int +RollbackHTSize(void) +{ + return hash_estimate_size(ROLLBACK_HT_SIZE, sizeof(RollbackHashEntry)); +} + +/* + * To initialize the hash-table for rollbacks in shared memory + * for the given size. + */ +void +InitRollbackHashTable(void) +{ + HASHCTL info; + MemSet(&info, 0, sizeof(info)); + + info.keysize = sizeof(UndoRecPtr); + info.entrysize = sizeof(RollbackHashEntry); + info.hash = tag_hash; + + RollbackHT = ShmemInitHash("Undo actions Lookup Table", + ROLLBACK_HT_SIZE, ROLLBACK_HT_SIZE, &info, + HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE); +} + +/* + * To push the rollback requests from backend to the hash-table. + * Return true if the request is successfully added, else false + * and the caller may execute undo actions itself. + */ +bool +PushRollbackReq(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr, Oid dbid) +{ + bool found = false; + RollbackHashEntry *rh; + + Assert(UndoRecPtrGetLogNo(start_urec_ptr) != UndoRecPtrGetLogNo(end_urec_ptr) || + start_urec_ptr >= end_urec_ptr); + /* + * If the location upto which rollback need to be done is not provided, + * then rollback the complete transaction. + */ + if (start_urec_ptr == InvalidUndoRecPtr) + { + UndoLogNumber logno = UndoRecPtrGetLogNo(end_urec_ptr); + start_urec_ptr = UndoLogGetLastXactStartPoint(logno); + } + + Assert(UndoRecPtrIsValid(start_urec_ptr)); + + /* If there is no space to accomodate new request, then we can't proceed. */ + if (RollbackHTIsFull()) + return false; + + if(!UndoRecPtrIsValid(end_urec_ptr)) + { + UndoLogNumber logno = UndoRecPtrGetLogNo(start_urec_ptr); + end_urec_ptr = UndoLogGetLastXactStartPoint(logno); + } + + LWLockAcquire(RollbackHTLock, LW_EXCLUSIVE); + + rh = (RollbackHashEntry *) hash_search(RollbackHT, &start_urec_ptr, + HASH_ENTER_NULL, &found); + if (!rh) + { + LWLockRelease(RollbackHTLock); + return false; + } + /* We shouldn't try to push the same rollback request again. */ + if (!found) + { + rh->start_urec_ptr = start_urec_ptr; + rh->end_urec_ptr = end_urec_ptr; + rh->dbid = (dbid == InvalidOid) ? MyDatabaseId : dbid; + } + LWLockRelease(RollbackHTLock); + + return true; +} + +/* + * To perform the undo actions for the transactions whose rollback + * requests are in hash table. Sequentially, scan the hash-table + * and perform the undo-actions for the respective transactions. + * Once, the undo-actions are applied, remove the entry from the + * hash table. + */ +void +RollbackFromHT(Oid dbid) +{ + UndoRecPtr start[ROLLBACK_HT_SIZE]; + UndoRecPtr end[ROLLBACK_HT_SIZE]; + RollbackHashEntry *rh; + HASH_SEQ_STATUS status; + int i = 0; + + /* Fetch the rollback requests */ + LWLockAcquire(RollbackHTLock, LW_SHARED); + + Assert(hash_get_num_entries(RollbackHT) <= ROLLBACK_HT_SIZE); + hash_seq_init(&status, RollbackHT); + while (RollbackHT != NULL && + (rh = (RollbackHashEntry *) hash_seq_search(&status)) != NULL) + { + if (rh->dbid == dbid) + { + start[i] = rh->start_urec_ptr; + end[i] = rh->end_urec_ptr; + i++; + } + } + + LWLockRelease(RollbackHTLock); + + /* Execute the rollback requests */ + while(--i >= 0) + { + Assert(UndoRecPtrIsValid(start[i])); + Assert(UndoRecPtrIsValid(end[i])); + + StartTransactionCommand(); + execute_undo_actions(start[i], end[i], true, false, true); + CommitTransactionCommand(); + } +} + +/* + * Remove the rollback request entry from the rollback hash table. + */ +static void +RollbackHTRemoveEntry(UndoRecPtr start_urec_ptr) +{ + LWLockAcquire(RollbackHTLock, LW_EXCLUSIVE); + + hash_search(RollbackHT, &start_urec_ptr, HASH_REMOVE, NULL); + + LWLockRelease(RollbackHTLock); +} + +/* + * To check if the rollback requests in the hash table are all + * completed or not. This is required because we don't not want to + * expose RollbackHT in xact.c, where it is required to ensure + * that we push the resuests only when there is some space in + * the hash-table. + */ +bool +RollbackHTIsFull(void) +{ + bool result = false; + + LWLockAcquire(RollbackHTLock, LW_SHARED); + + if (hash_get_num_entries(RollbackHT) >= ROLLBACK_HT_SIZE) + result = true; + + LWLockRelease(RollbackHTLock); + + return result; +} + +/* + * Get database list from the rollback hash table. + */ +List * +RollbackHTGetDBList() +{ + HASH_SEQ_STATUS status; + RollbackHashEntry *rh; + List *dblist = NIL; + + /* Fetch the rollback requests */ + LWLockAcquire(RollbackHTLock, LW_SHARED); + + hash_seq_init(&status, RollbackHT); + while (RollbackHT != NULL && + (rh = (RollbackHashEntry *) hash_seq_search(&status)) != NULL) + dblist = list_append_unique_oid(dblist, rh->dbid); + + LWLockRelease(RollbackHTLock); + + return dblist; +} + +/* + * ConditionTransactionUndoActionLock + * + * Insert a lock showing that the undo action for given transaction is in + * progress. This is only done for the main transaction not for the + * sub-transaction. + */ +bool +ConditionTransactionUndoActionLock(TransactionId xid) +{ + LOCKTAG tag; + + SET_LOCKTAG_TRANSACTION_UNDOACTION(tag, xid); + + if (LOCKACQUIRE_NOT_AVAIL == LockAcquire(&tag, ExclusiveLock, false, true)) + return false; + else + return true; +} + +/* + * TransactionUndoActionLockRelease + * + * Delete the lock showing that the undo action given transaction ID is in + * progress. + */ +void +TransactionUndoActionLockRelease(TransactionId xid) +{ + LOCKTAG tag; + + SET_LOCKTAG_TRANSACTION_UNDOACTION(tag, xid); + + LockRelease(&tag, ExclusiveLock, false); +} diff --git a/src/backend/access/undo/undoactionxlog.c b/src/backend/access/undo/undoactionxlog.c new file mode 100644 index 0000000..546f43d --- /dev/null +++ b/src/backend/access/undo/undoactionxlog.c @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * undoactionxlog.c + * WAL replay logic for undo actions. + * + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/access/undo/undoactionxlog.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/undoaction_xlog.h" +#include "access/undoinsert.h" +#include "access/visibilitymap.h" +#include "access/xlog.h" +#include "access/xlogutils.h" + +/* + * Replay of undo apply progress. + */ +static void +undo_xlog_apply_progress(XLogReaderState *record) +{ + xl_undoapply_progress *xlrec = (xl_undoapply_progress *) XLogRecGetData(record); + + /* Update the progress in the transaction header. */ + PrepareUpdateUndoActionProgress(xlrec->urec_ptr, xlrec->progress); + UndoRecordUpdateTransInfo(); +} + +void +undoaction_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_UNDO_APPLY_PROGRESS: + undo_xlog_apply_progress(record); + break; + default: + elog(PANIC, "undoaction_redo: unknown op code %u", info); + } +} diff --git a/src/backend/access/undo/undodiscard.c b/src/backend/access/undo/undodiscard.c new file mode 100644 index 0000000..9d55588 --- /dev/null +++ b/src/backend/access/undo/undodiscard.c @@ -0,0 +1,458 @@ +/*------------------------------------------------------------------------- + * + * undodiscard.c + * discard undo records + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/access/undo/undodiscard.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "access/xlog.h" +#include "access/xact.h" +#include "access/undolog.h" +#include "access/undodiscard.h" +#include "catalog/pg_tablespace.h" +#include "miscadmin.h" +#include "storage/block.h" +#include "storage/buf.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/proc.h" +#include "utils/resowner.h" +#include "postmaster/undoloop.h" + +static UndoRecPtr FetchLatestUndoPtrForXid(UndoRecPtr urecptr, + UnpackedUndoRecord *uur_start, + UndoLogControl *log); + +/* + * Discard the undo for the log + * + * Search the undo log, get the start record for each transaction until we get + * the transaction with xid >= xmin or an invalid xid. Then call undolog + * routine to discard upto that point and update the memory structure for the + * log slot. We set the hibernate flag if we do not have any undo logs, this + * flag is passed to the undo worker wherein it determines if system is idle + * and it should sleep for sometime. + * + * Return the oldest xid remaining in this undo log (which should be >= xmin, + * since we'll discard everything older). Return InvalidTransactionId if the + * undo log is empty. + */ +static TransactionId +UndoDiscardOneLog(UndoLogControl *log, TransactionId xmin, bool *hibernate) +{ + UndoRecPtr undo_recptr, next_insert, from_urecptr; + UndoRecPtr next_urecptr = InvalidUndoRecPtr; + UnpackedUndoRecord *uur = NULL; + bool need_discard = false; + bool log_complete = false; + TransactionId undoxid = InvalidTransactionId; + TransactionId xid = log->oldest_xid; + TransactionId latest_discardxid = InvalidTransactionId; + uint32 epoch = 0; + + undo_recptr = log->oldest_data; + + /* There might not be any undo log and hibernation might be needed. */ + *hibernate = true; + + /* Loop until we run out of discardable transactions. */ + do + { + bool pending_abort = false; + + next_insert = UndoLogGetNextInsertPtr(log->logno, xid); + + /* + * If the next insert location in the undo log is same as the oldest + * data for the log then there is nothing more to discard in this log + * so discard upto this point. + */ + if (next_insert == undo_recptr) + { + /* + * If the discard location and the insert location is same then + * there is nothing to discard. + */ + if (undo_recptr == log->oldest_data) + break; + else + log_complete = true; + } + else + { + /* Fetch the undo record for given undo_recptr. */ + uur = UndoFetchRecord(undo_recptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + + Assert(uur != NULL); + + if (!TransactionIdDidCommit(uur->uur_xid) && + TransactionIdPrecedes(uur->uur_xid, xmin) && + uur->uur_progress == 0) + { + /* + * At the time of recovery, we might not have a valid next undo + * record pointer and in that case we'll calculate the location + * of from pointer using the last record of next insert + * location. + */ + if (ConditionTransactionUndoActionLock(uur->uur_xid)) + { + TransactionId xid = uur->uur_xid; + UndoLogControl *log = NULL; + UndoLogNumber logno; + + logno = UndoRecPtrGetLogNo(undo_recptr); + log = UndoLogGet(logno, false); + + /* + * If the corresponding log got rewinded to a location + * prior to undo_recptr, the undo actions are already + * applied. + */ + if (log->meta.insert > undo_recptr) + { + UndoRecordRelease(uur); + + /* Fetch the undo record under undo action lock. */ + uur = UndoFetchRecord(undo_recptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + /* + * If the undo actions for the aborted transaction is + * already applied then continue discarding the undo log + * otherwise discard till current point and stop processing + * this undo log. + * Also, check this is indeed the transaction id we're + * looking for. It is possible that after rewinding + * some other transaction has inserted an undo record. + */ + if (uur->uur_xid == xid && uur->uur_progress == 0) + { + from_urecptr = FetchLatestUndoPtrForXid(undo_recptr, uur, log); + (void)PushRollbackReq(from_urecptr, undo_recptr, uur->uur_dbid); + pending_abort = true; + } + } + + TransactionUndoActionLockRelease(xid); + } + else + pending_abort = true; + } + + next_urecptr = uur->uur_next; + undoxid = uur->uur_xid; + xid = undoxid; + epoch = uur->uur_xidepoch; + } + + /* we can discard upto this point. */ + if (TransactionIdFollowsOrEquals(undoxid, xmin) || + next_urecptr == SpecialUndoRecPtr || + UndoRecPtrGetLogNo(next_urecptr) != log->logno || + log_complete || pending_abort) + { + /* Hey, I got some undo log to discard, can not hibernate now. */ + *hibernate = false; + + if (uur != NULL) + UndoRecordRelease(uur); + + /* + * If Transaction id is smaller than the xmin that means this must + * be the last transaction in this undo log, so we need to get the + * last insert point in this undo log and discard till that point. + * Also, if the transaction has pending abort, we stop discarding + * undo from the same location. + */ + if (TransactionIdPrecedes(undoxid, xmin) && !pending_abort) + { + UndoRecPtr next_insert = InvalidUndoRecPtr; + + /* + * Get the last insert location for this transaction Id, if it + * returns invalid pointer that means there is new transaction + * has started for this undolog. So we need to refetch the undo + * and continue the process. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, undoxid); + if (!UndoRecPtrIsValid(next_insert)) + continue; + + undo_recptr = next_insert; + need_discard = true; + epoch = 0; + latest_discardxid = undoxid; + undoxid = InvalidTransactionId; + } + + LWLockAcquire(&log->discard_lock, LW_EXCLUSIVE); + + /* + * If no more pending undo logs then set the oldest transaction to + * InvalidTransactionId. + */ + if (log_complete) + { + log->oldest_xid = InvalidTransactionId; + log->oldest_xidepoch = 0; + } + else + { + log->oldest_xid = undoxid; + log->oldest_xidepoch = epoch; + } + + log->oldest_data = undo_recptr; + LWLockRelease(&log->discard_lock); + + if (need_discard) + UndoLogDiscard(undo_recptr, latest_discardxid); + + break; + } + + /* + * This transaction is smaller than the xmin so lets jump to the next + * transaction. + */ + undo_recptr = next_urecptr; + latest_discardxid = undoxid; + + if(uur != NULL) + { + UndoRecordRelease(uur); + uur = NULL; + } + + need_discard = true; + } while (true); + + return undoxid; +} + +/* + * Discard the undo for all the transaction whose xid is smaller than xmin + * + * Check the DiscardInfo memory array for each slot (every undo log) , process + * the undo log for all the slot which have xid smaller than xmin or invalid + * xid. Fetch the record from the undo log transaction by transaction until we + * find the xid which is not smaller than xmin. + */ +void +UndoDiscard(TransactionId oldestXmin, bool *hibernate) +{ + TransactionId oldestXidHavingUndo = oldestXmin; + uint64 epoch = GetEpochForXid(oldestXmin); + UndoLogControl *log = NULL; + + /* + * TODO: Ideally we'd arrange undo logs so that we can efficiently find + * those with oldest_xid < oldestXmin, but for now we'll just scan all of + * them. + */ + while ((log = UndoLogNext(log))) + { + TransactionId oldest_xid = InvalidTransactionId; + + /* We can't process temporary undo logs. */ + if (log->meta.persistence == UNDO_TEMP) + continue; + + /* + * If the first xid of the undo log is smaller than the xmin the try + * to discard the undo log. + */ + if (TransactionIdPrecedes(log->oldest_xid, oldestXmin)) + { + /* + * If the XID in the discard entry is invalid then start scanning + * from the first valid undorecord in the log. + */ + if (!TransactionIdIsValid(log->oldest_xid)) + { + bool full = false; + UndoRecPtr urp = UndoLogGetFirstValidRecord(log, &full); + + if (!UndoRecPtrIsValid(urp)) + { + /* + * There is nothing to be discarded. If there is also no + * more free space, then a call to UndoLogDiscard() will + * discard it the undo log completely and free up the + * UndoLogControl slot. + */ + if (full) + UndoLogDiscard(MakeUndoRecPtr(log->meta.logno, + log->meta.discard), + InvalidTransactionId); + continue; + } + + LWLockAcquire(&log->discard_lock, LW_SHARED); + log->oldest_data = urp; + LWLockRelease(&log->discard_lock); + } + + /* Process the undo log. */ + oldest_xid = UndoDiscardOneLog(log, oldestXmin, hibernate); + } + + if (TransactionIdIsValid(oldest_xid) && + TransactionIdPrecedes(oldest_xid, oldestXidHavingUndo)) + { + oldestXidHavingUndo = oldest_xid; + epoch = GetEpochForXid(oldest_xid); + } + } + + /* + * Update the oldestXidWithEpochHavingUndo in the shared memory. + * + * XXX In future if multiple worker can perform discard then we may need + * to use compare and swap for updating the shared memory value. + */ + pg_atomic_write_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, + MakeEpochXid(epoch, oldestXidHavingUndo)); +} + +/* + * Fetch the latest urec pointer for the transaction. + */ +UndoRecPtr +FetchLatestUndoPtrForXid(UndoRecPtr urecptr, UnpackedUndoRecord *uur_start, + UndoLogControl *log) +{ + UndoRecPtr next_urecptr, from_urecptr; + uint16 prevlen; + UndoLogOffset next_insert; + UnpackedUndoRecord *uur; + bool refetch = false; + + uur = uur_start; + + while (true) + { + /* fetch the undo record again if required. */ + if (refetch) + { + uur = UndoFetchRecord(urecptr, InvalidBlockNumber, + InvalidOffsetNumber, InvalidTransactionId, + NULL, NULL); + refetch = false; + } + + next_urecptr = uur->uur_next; + prevlen = UndoLogGetPrevLen(log->logno); + + /* + * If this is the last transaction in the log then calculate the latest + * urec pointer using next insert location of the undo log. Otherwise, + * calculate using next transaction's start pointer. + */ + if (uur->uur_next == SpecialUndoRecPtr) + { + /* + * While fetching the next insert location if the new transaction + * has already started in this log then lets re-fetch the undo + * record. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + if (!UndoRecPtrIsValid(next_insert)) + { + if (uur != uur_start) + UndoRecordRelease(uur); + refetch = true; + continue; + } + + from_urecptr = UndoGetPrevUndoRecptr(next_insert, prevlen); + break; + } + else if ((UndoRecPtrGetLogNo(next_urecptr) != log->logno) && + UndoLogIsDiscarded(next_urecptr)) + { + /* + * If next_urecptr is in different undolog and its already discarded + * that means the undo actions for this transaction which are in the + * next log has already been executed and we only need to execute + * which are remaining in this log. + */ + next_insert = UndoLogGetNextInsertPtr(log->logno, uur->uur_xid); + + Assert(UndoRecPtrIsValid(next_insert)); + from_urecptr = UndoGetPrevUndoRecptr(next_insert, prevlen); + break; + } + else + { + UnpackedUndoRecord *next_uur; + + next_uur = UndoFetchRecord(next_urecptr, + InvalidBlockNumber, + InvalidOffsetNumber, + InvalidTransactionId, + NULL, NULL); + /* + * If the next_urecptr is in the same log then calculate the + * from pointer using prevlen. + */ + if (UndoRecPtrGetLogNo(next_urecptr) == log->logno) + { + from_urecptr = + UndoGetPrevUndoRecptr(next_urecptr, next_uur->uur_prevlen); + UndoRecordRelease(next_uur); + break; + } + else + { + /* + * The transaction is overflowed to the next log, so restart + * the processing from then next log. + */ + log = UndoLogGet(UndoRecPtrGetLogNo(next_urecptr), false); + if (uur != uur_start) + UndoRecordRelease(uur); + uur = next_uur; + continue; + } + + UndoRecordRelease(next_uur); + } + } + + if (uur != uur_start) + UndoRecordRelease(uur); + + return from_urecptr; +} + +/* + * Discard the undo logs for temp tables. + */ +void +TempUndoDiscard(UndoLogNumber logno) +{ + UndoLogControl *log = UndoLogGet(logno, false); + + /* + * Discard the undo log for temp table only. Ensure that there is + * something to be discarded there. + */ + Assert (log->meta.persistence == UNDO_TEMP); + + /* Process the undo log. */ + UndoLogDiscard(MakeUndoRecPtr(log->logno, log->meta.insert), + InvalidTransactionId); +} diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c index 2453cad..178b01b 100644 --- a/src/backend/access/undo/undoinsert.c +++ b/src/backend/access/undo/undoinsert.c @@ -133,7 +133,6 @@ static UnpackedUndoRecord* UndoGetOneRecord(UnpackedUndoRecord *urec, UndoPersistence persistence); static void PrepareUndoRecordUpdateTransInfo(UndoRecPtr urecptr, bool log_switched); -static void UndoRecordUpdateTransInfo(void); static int InsertFindBufferSlot(RelFileNode rnode, BlockNumber blk, ReadBufferMode rbm, UndoPersistence persistence); @@ -271,12 +270,60 @@ PrepareUndoRecordUpdateTransInfo(UndoRecPtr urecptr, bool log_switched) } /* + * Update the progress of the undo record in the transaction header. + */ +void +PrepareUpdateUndoActionProgress(UndoRecPtr urecptr, int progress) +{ + Buffer buffer = InvalidBuffer; + BlockNumber cur_blk; + RelFileNode rnode; + UndoLogNumber logno = UndoRecPtrGetLogNo(urecptr); + UndoLogControl *log; + Page page; + int already_decoded = 0; + int starting_byte; + int bufidx; + int index = 0; + + log = UndoLogGet(logno, false); + + if (log->meta.persistence == UNDO_TEMP) + return; + + UndoRecPtrAssignRelFileNode(rnode, urecptr); + cur_blk = UndoRecPtrGetBlockNum(urecptr); + starting_byte = UndoRecPtrGetPageOffset(urecptr); + + while (true) + { + bufidx = InsertFindBufferSlot(rnode, cur_blk, + RBM_NORMAL, + log->meta.persistence); + prev_txn_info.prev_txn_undo_buffers[index] = bufidx; + buffer = undo_buffer[bufidx].buf; + page = BufferGetPage(buffer); + index++; + + if (UnpackUndoRecord(&prev_txn_info.uur, page, starting_byte, + &already_decoded, true)) + break; + + starting_byte = UndoLogBlockHeaderSize; + cur_blk++; + } + + prev_txn_info.prev_urecptr = urecptr; + prev_txn_info.uur.uur_progress = progress; +} + +/* * Overwrite the first undo record of the previous transaction to update its * next pointer. This will just insert the already prepared record by * PrepareUndoRecordUpdateTransInfo. This must be called under the critical * section. This will just overwrite the undo header not the data. */ -static void +void UndoRecordUpdateTransInfo(void) { UndoLogNumber logno = UndoRecPtrGetLogNo(prev_txn_info.prev_urecptr); diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c index 33bb153..5928784 100644 --- a/src/backend/access/undo/undorecord.c +++ b/src/backend/access/undo/undorecord.c @@ -95,6 +95,7 @@ InsertUndoRecord(UnpackedUndoRecord *uur, Page page, */ if (*already_written == 0) { + work_hdr.urec_rmid = uur->uur_rmid; work_hdr.urec_type = uur->uur_type; work_hdr.urec_info = uur->uur_info; work_hdr.urec_prevlen = uur->uur_prevlen; @@ -120,6 +121,7 @@ InsertUndoRecord(UnpackedUndoRecord *uur, Page page, * We should have been passed the same record descriptor as before, * or caller has messed up. */ + Assert(work_hdr.urec_rmid == uur->uur_rmid); Assert(work_hdr.urec_type == uur->uur_type); Assert(work_hdr.urec_info == uur->uur_info); Assert(work_hdr.urec_prevlen == uur->uur_prevlen); @@ -282,6 +284,7 @@ bool UnpackUndoRecord(UnpackedUndoRecord *uur, Page page, int starting_byte, &my_bytes_decoded, already_decoded, false)) return false; + uur->uur_rmid = work_hdr.urec_rmid; uur->uur_type = work_hdr.urec_type; uur->uur_info = work_hdr.urec_info; uur->uur_prevlen = work_hdr.urec_prevlen; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index a86963f..8548a65 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -1381,6 +1381,7 @@ vac_truncate_clog(TransactionId frozenXID, MultiXactId lastSaneMinMulti) { TransactionId nextXID = ReadNewTransactionId(); + TransactionId oldestXidHavingUndo; Relation relation; HeapScanDesc scan; HeapTuple tuple; @@ -1475,6 +1476,16 @@ vac_truncate_clog(TransactionId frozenXID, return; /* + * We can't truncate the clog for transactions that still have undo. The + * oldestXidHavingUndo will be only valid for zheap storage engine, so it + * won't impact any other storage engine. + */ + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + if (TransactionIdIsValid(oldestXidHavingUndo)) + frozenXID = Min(frozenXID, oldestXidHavingUndo); + + /* * Advance the oldest value for commit timestamps before truncating, so * that if a user requests a timestamp for a transaction we're truncating * away right after this point, they get NULL instead of an ugly "file not diff --git a/src/backend/postmaster/Makefile b/src/backend/postmaster/Makefile index 71c2321..9ce6ff0 100644 --- a/src/backend/postmaster/Makefile +++ b/src/backend/postmaster/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/postmaster top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o fork_process.o \ - pgarch.o pgstat.o postmaster.o startup.o syslogger.o walwriter.o +OBJS = autovacuum.o bgworker.o bgwriter.o checkpointer.o discardworker.o fork_process.o \ + pgarch.o pgstat.o postmaster.o startup.o syslogger.o undoworker.o walwriter.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index d2b695e..49df516 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -20,7 +20,9 @@ #include "pgstat.h" #include "port/atomics.h" #include "postmaster/bgworker_internals.h" +#include "postmaster/discardworker.h" #include "postmaster/postmaster.h" +#include "postmaster/undoworker.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "storage/dsm.h" @@ -129,6 +131,15 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "UndoLauncherMain", UndoLauncherMain + }, + { + "UndoWorkerMain", UndoWorkerMain + }, + { + "DiscardWorkerMain", DiscardWorkerMain } }; diff --git a/src/backend/postmaster/discardworker.c b/src/backend/postmaster/discardworker.c new file mode 100644 index 0000000..e4c6719 --- /dev/null +++ b/src/backend/postmaster/discardworker.c @@ -0,0 +1,170 @@ +/*------------------------------------------------------------------------- + * + * discardworker.c + * The undo discard worker for asynchronous undo management. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/postmaster/discardworker.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include + +/* These are always necessary for a bgworker. */ +#include "access/transam.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" + +#include "access/undodiscard.h" +#include "pgstat.h" +#include "postmaster/discardworker.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/guc.h" +#include "utils/resowner.h" + +static void undoworker_sigterm_handler(SIGNAL_ARGS); + +/* max sleep time between cycles (100 milliseconds) */ +#define MIN_NAPTIME_PER_CYCLE 100L +#define DELAYED_NAPTIME 10 * MIN_NAPTIME_PER_CYCLE +#define MAX_NAPTIME_PER_CYCLE 100 * MIN_NAPTIME_PER_CYCLE + +static bool got_SIGTERM = false; +static bool hibernate = false; +static long wait_time = MIN_NAPTIME_PER_CYCLE; + +/* SIGTERM: set flag to exit at next convenient time */ +static void +undoworker_sigterm_handler(SIGNAL_ARGS) +{ + got_SIGTERM = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); +} + +/* + * DiscardWorkerRegister -- Register a undo discard worker. + */ +void +DiscardWorkerRegister(void) +{ + BackgroundWorker bgw; + + /* TODO: This should be configurable. */ + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_name, BGW_MAXLEN, "discard worker"); + sprintf(bgw.bgw_library_name, "postgres"); + sprintf(bgw.bgw_function_name, "DiscardWorkerMain"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * DiscardWorkerMain -- Main loop for the undo discard worker. + */ +void +DiscardWorkerMain(Datum main_arg) +{ + ereport(LOG, + (errmsg("discard worker started"))); + + /* Establish signal handlers. */ + pqsignal(SIGTERM, undoworker_sigterm_handler); + BackgroundWorkerUnblockSignals(); + + /* Make it easy to identify our processes. */ + SetConfigOption("application_name", MyBgworkerEntry->bgw_name, + PGC_USERSET, PGC_S_SESSION); + + /* + * Create resource owner for discard worker as it need to read the undo + * records outside the transaction blocks which intern access buffer read + * routine. + */ + CreateAuxProcessResourceOwner(); + + /* Enter main loop */ + while (!got_SIGTERM) + { + int rc; + TransactionId OldestXmin, oldestXidHavingUndo; + + OldestXmin = GetOldestXmin(NULL, PROCARRAY_FLAGS_DEFAULT); + + oldestXidHavingUndo = GetXidFromEpochXid( + pg_atomic_read_u64(&ProcGlobal->oldestXidWithEpochHavingUndo)); + + /* + * Call the discard routine if there oldestXidHavingUndo is lagging + * behind OldestXmin. + */ + if (OldestXmin != InvalidTransactionId && + TransactionIdPrecedes(oldestXidHavingUndo, OldestXmin)) + { + UndoDiscard(OldestXmin, &hibernate); + + /* + * If we got some undo logs to discard or discarded something, + * then reset the wait_time as we have got work to do. + * Note that if there are some undologs that cannot be discarded, + * then above condition will remain unsatisified till oldestXmin + * remains unchanged and the wait_time will not reset in that case. + */ + if (!hibernate) + wait_time = MIN_NAPTIME_PER_CYCLE; + } + + /* Wait for more work. */ + rc = WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wait_time, + WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN); + + ResetLatch(&MyProc->procLatch); + + /* + * Increase the wait_time based on the length of inactivity. If wait_time + * is within one second, then increment it by 100 ms at a time. Henceforth, + * increment it one second at a time, till it reaches ten seconds. Never + * increase the wait_time more than ten seconds, it will be too much of + * waiting otherwise. + */ + if (rc & WL_TIMEOUT && hibernate) + { + wait_time += (wait_time < DELAYED_NAPTIME ? + MIN_NAPTIME_PER_CYCLE : DELAYED_NAPTIME); + if (wait_time > MAX_NAPTIME_PER_CYCLE) + wait_time = MAX_NAPTIME_PER_CYCLE; + } + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } + + ReleaseAuxProcessResources(true); + + /* we're done */ + ereport(LOG, + (errmsg("discard worker shutting down"))); + + proc_exit(0); +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index dc86307..f4b0492 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3502,6 +3502,12 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_SYSLOGGER_MAIN: event_name = "SysLoggerMain"; break; + case WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN: + event_name = "UndoDiscardWorkerMain"; + break; + case WAIT_EVENT_UNDO_LAUNCHER_MAIN: + event_name = "UndoLauncherMain"; + break; case WAIT_EVENT_WAL_RECEIVER_MAIN: event_name = "WalReceiverMain"; break; @@ -3914,7 +3920,6 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_UNDO_FILE_SYNC: event_name = "UndoFileSync"; break; - case WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ: event_name = "WALSenderTimelineHistoryRead"; break; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 688f462..603c733 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -111,10 +111,12 @@ #include "port/pg_bswap.h" #include "postmaster/autovacuum.h" #include "postmaster/bgworker_internals.h" +#include "postmaster/discardworker.h" #include "postmaster/fork_process.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" +#include "postmaster/undoworker.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" #include "storage/fd.h" @@ -978,6 +980,11 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + UndoLauncherRegister(); + + /* Register the Undo Discard worker. */ + DiscardWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/postmaster/undoworker.c b/src/backend/postmaster/undoworker.c new file mode 100644 index 0000000..caa7db1 --- /dev/null +++ b/src/backend/postmaster/undoworker.c @@ -0,0 +1,664 @@ +/*------------------------------------------------------------------------- + * + * undoworker.c + * undo launcher and undo worker process. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/postmaster/undoworker.c + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "access/heapam.h" +#include "access/htup.h" +#include "access/htup_details.h" +#include "access/sysattr.h" +#include "access/xact.h" + +#include "catalog/indexing.h" +#include "catalog/pg_database.h" + +#include "libpq/pqsignal.h" + +#include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" +#include "postmaster/postmaster.h" +#include "postmaster/undoloop.h" +#include "postmaster/undoworker.h" + +#include "replication/slot.h" +#include "replication/worker_internal.h" + +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" + +#include "tcop/tcopprot.h" + +#include "utils/fmgroids.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* max sleep time between cycles (100 milliseconds) */ +#define DEFAULT_NAPTIME_PER_CYCLE 100L +#define DEFAULT_RETRY_NAPTIME 50L + +int max_undo_workers = 5; + +typedef struct UndoApplyWorker +{ + /* Indicates if this slot is used or free. */ + bool in_use; + + /* Increased everytime the slot is taken by new worker. */ + uint16 generation; + + /* Pointer to proc array. NULL if not running. */ + PGPROC *proc; + + /* Database id to connect to. */ + Oid dbid; +} UndoApplyWorker; + +UndoApplyWorker *MyUndoWorker = NULL; + +typedef struct UndoApplyCtxStruct +{ + /* Supervisor process. */ + pid_t launcher_pid; + + /* Background workers. */ + UndoApplyWorker workers[FLEXIBLE_ARRAY_MEMBER]; +} UndoApplyCtxStruct; + +UndoApplyCtxStruct *UndoApplyCtx; + +static void undo_worker_onexit(int code, Datum arg); +static void undo_worker_cleanup(UndoApplyWorker *worker); + +static volatile sig_atomic_t got_SIGHUP = false; + +/* + * Wait for a background worker to start up and attach to the shmem context. + * + * This is only needed for cleaning up the shared memory in case the worker + * fails to attach. + */ +static void +WaitForUndoWorkerAttach(UndoApplyWorker *worker, + uint16 generation, + BackgroundWorkerHandle *handle) +{ + BgwHandleStatus status; + int rc; + + for (;;) + { + pid_t pid; + + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(UndoWorkerLock, LW_SHARED); + + /* Worker either died or has started; no need to do anything. */ + if (!worker->in_use || worker->proc) + { + LWLockRelease(UndoWorkerLock); + return; + } + + LWLockRelease(UndoWorkerLock); + + /* Check if worker has died before attaching, and clean up after it. */ + status = GetBackgroundWorkerPid(handle, &pid); + + if (status == BGWH_STOPPED) + { + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + /* Ensure that this was indeed the worker we waited for. */ + if (generation == worker->generation) + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + return; + } + + /* + * We need timeout because we generally don't get notified via latch + * about the worker attach. But we don't expect to have to wait long. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 10L, WAIT_EVENT_BGWORKER_STARTUP); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + + return; +} + +/* + * Get dbid from the worker slot. + */ +static Oid +slot_get_dbid(int slot) +{ + Oid dbid; + + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker = &UndoApplyCtx->workers[slot]; + + if (!MyUndoWorker->in_use) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is empty,", + slot))); + } + + dbid = MyUndoWorker->dbid; + + LWLockRelease(UndoWorkerLock); + + return dbid; +} + +/* + * Attach to a slot. + */ +static void +undo_worker_attach(int slot) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + MyUndoWorker = &UndoApplyCtx->workers[slot]; + + if (!MyUndoWorker->in_use) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is empty, cannot attach", + slot))); + } + + if (MyUndoWorker->proc) + { + LWLockRelease(UndoWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("undo worker slot %d is already used by " + "another worker, cannot attach", slot))); + } + + MyUndoWorker->proc = MyProc; + before_shmem_exit(undo_worker_onexit, (Datum) 0); + + LWLockRelease(UndoWorkerLock); +} + +/* + * Walks the workers array and searches for one that matches given + * dbid. + */ +static UndoApplyWorker * +undo_worker_find(Oid dbid) +{ + int i; + UndoApplyWorker *res = NULL; + + Assert(LWLockHeldByMe(UndoWorkerLock)); + + /* Search for attached worker for a given db id. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (w->in_use && w->dbid == dbid) + { + res = w; + break; + } + } + + return res; +} + +/* + * Check whether the dbid exist or not. + * + * Refer comments from GetDatabaseTupleByOid. + * FIXME: Should we expose GetDatabaseTupleByOid and directly use it. + */ +static bool +dbid_exist(Oid dboid) +{ + HeapTuple tuple; + Relation relation; + SysScanDesc scan; + ScanKeyData key[1]; + bool result = false; + /* + * form a scan key + */ + ScanKeyInit(&key[0], + ObjectIdAttributeNumber, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(dboid)); + + relation = heap_open(DatabaseRelationId, AccessShareLock); + scan = systable_beginscan(relation, DatabaseOidIndexId, + criticalSharedRelcachesBuilt, + NULL, + 1, key); + + tuple = systable_getnext(scan); + + if (HeapTupleIsValid(tuple)) + result = true; + + /* all done */ + systable_endscan(scan); + heap_close(relation, AccessShareLock); + + return result; +} + +/* + * Start new undo apply background worker, if possible otherwise return false. + */ +static bool +undo_worker_launch(Oid dbid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + uint16 generation; + int i; + int slot = 0; + UndoApplyWorker *worker = NULL; + + /* + * We need to do the modification of the shared memory under lock so that + * we have consistent view. + */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + /* Find unused worker slot. */ + for (i = 0; i < max_undo_workers; i++) + { + UndoApplyWorker *w = &UndoApplyCtx->workers[i]; + + if (!w->in_use) + { + worker = w; + slot = i; + break; + } + } + + /* There are no more free worker slots */ + if (worker == NULL) + return false; + + /* Prepare the worker slot. */ + worker->in_use = true; + worker->proc = NULL; + worker->dbid = dbid; + worker->generation++; + + generation = worker->generation; + LWLockRelease(UndoWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoWorkerMain"); + snprintf(bgw.bgw_type, BGW_MAXLEN, "undo apply worker"); + snprintf(bgw.bgw_name, BGW_MAXLEN, "undo apply worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(slot); + + StartTransactionCommand(); + /* Check the database exists or not. */ + if (!dbid_exist(dbid)) + { + CommitTransactionCommand(); + + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + return true; + } + + /* + * Acquire database object lock before launching the worker so that it + * doesn't get dropped while worker is connecting to the database. + */ + LockSharedObject(DatabaseRelationId, dbid, 0, RowExclusiveLock); + + /* Recheck whether database still exists or not. */ + if (!dbid_exist(dbid)) + { + CommitTransactionCommand(); + + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + return true; + } + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + undo_worker_cleanup(worker); + LWLockRelease(UndoWorkerLock); + + UnlockSharedObject(DatabaseRelationId, dbid, 0, RowExclusiveLock); + CommitTransactionCommand(); + + return false; + } + + /* Now wait until it attaches. */ + WaitForUndoWorkerAttach(worker, generation, bgw_handle); + + /* + * By this point the undo-worker has already connected to the database so we + * can release the database lock. + */ + UnlockSharedObject(DatabaseRelationId, dbid, 0, RowExclusiveLock); + CommitTransactionCommand(); + + return true; +} + +/* + * Detach the worker (cleans up the worker info). + */ +static void +undo_worker_detach(void) +{ + /* Block concurrent access. */ + LWLockAcquire(UndoWorkerLock, LW_EXCLUSIVE); + + undo_worker_cleanup(MyUndoWorker); + + LWLockRelease(UndoWorkerLock); +} + +/* + * Clean up worker info. + */ +static void +undo_worker_cleanup(UndoApplyWorker *worker) +{ + Assert(LWLockHeldByMeInMode(UndoWorkerLock, LW_EXCLUSIVE)); + + worker->in_use = false; + worker->proc = NULL; + worker->dbid = InvalidOid; +} + +/* + * Cleanup function for undo worker launcher. + * + * Called on undo worker launcher exit. + */ +static void +undo_launcher_onexit(int code, Datum arg) +{ + UndoApplyCtx->launcher_pid = 0; +} + +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +undo_launcher_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + /* Waken anything waiting on the process latch */ + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Cleanup function. + * + * Called on logical replication worker exit. + */ +static void +undo_worker_onexit(int code, Datum arg) +{ + undo_worker_detach(); +} + +/* + * UndoLauncherShmemSize + * Compute space needed for undo launcher shared memory + */ +Size +UndoLauncherShmemSize(void) +{ + Size size; + + /* + * Need the fixed struct and the array of LogicalRepWorker. + */ + size = sizeof(UndoApplyCtxStruct); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_undo_workers, + sizeof(UndoApplyWorker))); + return size; +} + +/* + * UndoLauncherRegister + * Register a background worker running the undo worker launcher. + */ +void +UndoLauncherRegister(void) +{ + BackgroundWorker bgw; + + if (max_undo_workers == 0) + return; + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "UndoLauncherMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "undo worker launcher"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "undo worker launcher"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +/* + * UndoLauncherShmemInit + * Allocate and initialize undo worker launcher shared memory + */ +void +UndoLauncherShmemInit(void) +{ + bool found; + + UndoApplyCtx = (UndoApplyCtxStruct *) + ShmemInitStruct("Undo Worker Launcher Data", + UndoLauncherShmemSize(), + &found); + + if (!found) + memset(UndoApplyCtx, 0, UndoLauncherShmemSize()); +} + +/* + * Main loop for the undo worker launcher process. + */ +void +UndoLauncherMain(Datum main_arg) +{ + MemoryContext tmpctx; + MemoryContext oldctx; + + ereport(DEBUG1, + (errmsg("undo launcher started"))); + + before_shmem_exit(undo_launcher_onexit, (Datum) 0); + + Assert(UndoApplyCtx->launcher_pid == 0); + UndoApplyCtx->launcher_pid = MyProcPid; + + /* Establish signal handlers. */ + pqsignal(SIGHUP, undo_launcher_sighup); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* + * Establish connection to nailed catalogs (we only ever access + * pg_subscription). + */ + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* Use temporary context for the database list and worker info. */ + tmpctx = AllocSetContextCreate(TopMemoryContext, + "Undo worker Launcher context", + ALLOCSET_DEFAULT_SIZES); + /* Enter main loop */ + for (;;) + { + int rc; + List *dblist; + ListCell *l; + + CHECK_FOR_INTERRUPTS(); + + /* switch to the temp context. */ + oldctx = MemoryContextSwitchTo(tmpctx); + dblist = RollbackHTGetDBList(); + + foreach(l, dblist) + { + UndoApplyWorker *w; + Oid dbid = lfirst_oid(l); + + LWLockAcquire(UndoWorkerLock, LW_SHARED); + w = undo_worker_find(dbid); + LWLockRelease(UndoWorkerLock); + + if (w == NULL) + { +retry: + if (!undo_worker_launch(dbid)) + { + /* Could not launch the worker, retry after sometime, */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + DEFAULT_RETRY_NAPTIME, + WAIT_EVENT_UNDO_LAUNCHER_MAIN); + goto retry; + } + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + + /* Clean the temporary memory. */ + MemoryContextReset(tmpctx); + + /* Wait for more work. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + DEFAULT_NAPTIME_PER_CYCLE, + WAIT_EVENT_UNDO_LAUNCHER_MAIN); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } +} + +/* + * UndoWorkerMain -- Main loop for the undo apply worker. + */ +void +UndoWorkerMain(Datum main_arg) +{ + int worker_slot = DatumGetInt32(main_arg); + Oid dbid; + + dbid = slot_get_dbid(worker_slot); + + /* Setup signal handling */ + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Connect to the database. */ + BackgroundWorkerInitializeConnectionByOid(dbid, 0, 0); + + /* Attach to slot */ + undo_worker_attach(worker_slot); + + /* + * Create resource owner for undo worker. Undo worker need this as it + * need to read the undo records outside the transaction blocks which + * intern access buffer read routine. + */ + CreateAuxProcessResourceOwner(); + + RollbackFromHT(dbid); + + ReleaseAuxProcessResources(true); + + proc_exit(0); +} diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index f60ecc5..49e1028 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -159,6 +159,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); break; + case RM_UNDOACTION_ID: + /* Logical decoding is not yet implemented for undoactions. */ + Assert(0); + break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); } diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 4725cbe..1470041 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -29,6 +29,8 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "postmaster/undoloop.h" +#include "postmaster/undoworker.h" #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/walreceiver.h" @@ -152,6 +154,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, BackendRandomShmemSize()); + size = add_size(size, RollbackHTSize()); + size = add_size(size, UndoLauncherShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -226,6 +230,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) SUBTRANSShmemInit(); MultiXactShmemInit(); InitBufferPool(); + InitRollbackHashTable(); /* * Set up lock manager @@ -264,6 +269,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) WalSndShmemInit(); WalRcvShmemInit(); ApplyLauncherShmemInit(); + UndoLauncherShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index 554af46..52f6959 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -51,3 +51,5 @@ BackendRandomLock 43 LogicalRepWorkerLock 44 CLogTruncationLock 45 UndoLogLock 46 +RollbackHTLock 47 +UndoWorkerLock 48 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 6f9aaa5..3138b04 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -279,6 +279,8 @@ InitProcGlobal(void) /* Create ProcStructLock spinlock, too */ ProcStructLock = (slock_t *) ShmemAlloc(sizeof(slock_t)); SpinLockInit(ProcStructLock); + + pg_atomic_init_u64(&ProcGlobal->oldestXidWithEpochHavingUndo, 0); } /* diff --git a/src/backend/utils/adt/lockfuncs.c b/src/backend/utils/adt/lockfuncs.c index 66c09a1..67dda39 100644 --- a/src/backend/utils/adt/lockfuncs.c +++ b/src/backend/utils/adt/lockfuncs.c @@ -32,6 +32,7 @@ const char *const LockTagTypeNames[] = { "virtualxid", "speculative token", "object", + "undoaction", "userlock", "advisory" }; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index c693977..12e7704 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -121,6 +121,7 @@ bool allowSystemTableMods = false; int work_mem = 1024; int maintenance_work_mem = 16384; int max_parallel_maintenance_workers = 2; +int rollback_overflow_size = 64; /* * Primary determinants of sizes of shared-memory structures. diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 287ca00..85a7f11 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2806,6 +2806,17 @@ static struct config_int ConfigureNamesInt[] = }, { + {"rollback_overflow_size", PGC_USERSET, RESOURCES_MEM, + gettext_noop("Rollbacks greater than this size are done lazily"), + NULL, + GUC_UNIT_MB + }, + &rollback_overflow_size, + 64, 0, MAX_KILOBYTES, + NULL, NULL, NULL + }, + + { {"wal_segment_size", PGC_INTERNAL, PRESET_OPTIONS, gettext_noop("Shows the size of write ahead log segments."), NULL, diff --git a/src/backend/utils/misc/pg_controldata.c b/src/backend/utils/misc/pg_controldata.c index 3fc8b6a..5a5d98c 100644 --- a/src/backend/utils/misc/pg_controldata.c +++ b/src/backend/utils/misc/pg_controldata.c @@ -78,8 +78,8 @@ pg_control_system(PG_FUNCTION_ARGS) Datum pg_control_checkpoint(PG_FUNCTION_ARGS) { - Datum values[19]; - bool nulls[19]; + Datum values[20]; + bool nulls[20]; TupleDesc tupdesc; HeapTuple htup; ControlFileData *ControlFile; @@ -128,6 +128,8 @@ pg_control_checkpoint(PG_FUNCTION_ARGS) XIDOID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 18, "checkpoint_time", TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 19, "oldest_xid_with_epoch_having_undo", + TIMESTAMPTZOID, -1, 0); tupdesc = BlessTupleDesc(tupdesc); /* Read the control file. */ @@ -202,6 +204,9 @@ pg_control_checkpoint(PG_FUNCTION_ARGS) time_t_to_timestamptz(ControlFile->checkPointCopy.time)); nulls[17] = false; + values[18] = Int64GetDatum(ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo); + nulls[18] = false; + htup = heap_form_tuple(tupdesc, values, nulls); PG_RETURN_DATUM(HeapTupleGetDatum(htup)); diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 4e61bc6..f3bf0fd 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -686,4 +686,11 @@ # CUSTOMIZED OPTIONS #------------------------------------------------------------------------------ +# If often there are large transactions requiring rollbacks, then we can push +# them to undo-workers for better performance. The size specifeid by the +# parameter below, determines the minimum size of the rollback requests to be +# sent to the undo-worker. +# +#rollback_overflow_size = 64 + # Add settings for extensions here diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 895a51f..753929e 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -278,6 +278,8 @@ main(int argc, char *argv[]) ControlFile->checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile->checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestXidWithEpochHavingUndo:" UINT64_FORMAT "\n"), + ControlFile->checkPointCopy.oldestXidWithEpochHavingUndo); printf(_("Time of latest checkpoint: %s\n"), ckpttime_str); printf(_("Fake LSN counter for unlogged rels: %X/%X\n"), diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 6fb403a..72714dd 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -448,6 +448,7 @@ main(int argc, char *argv[]) if (ControlFile.checkPointCopy.oldestXid < FirstNormalTransactionId) ControlFile.checkPointCopy.oldestXid += FirstNormalTransactionId; ControlFile.checkPointCopy.oldestXidDB = InvalidOid; + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo = 0; } if (set_oldest_commit_ts_xid != 0) @@ -716,6 +717,8 @@ GuessControlValues(void) ControlFile.checkPointCopy.oldestMultiDB = InvalidOid; ControlFile.checkPointCopy.time = (pg_time_t) time(NULL); ControlFile.checkPointCopy.oldestActiveXid = InvalidTransactionId; + ControlFile.checkPointCopy.nextXid = 0; + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo = 0; ControlFile.state = DB_SHUTDOWNED; ControlFile.time = (pg_time_t) time(NULL); @@ -808,6 +811,8 @@ PrintControlValues(bool guessed) ControlFile.checkPointCopy.oldestCommitTsXid); printf(_("Latest checkpoint's newestCommitTsXid:%u\n"), ControlFile.checkPointCopy.newestCommitTsXid); + printf(_("Latest checkpoint's oldestXidWithEpochHavingUndo:" UINT64_FORMAT "\n"), + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo); printf(_("Maximum data alignment: %u\n"), ControlFile.maxAlign); /* we don't print floatFormat since can't say much useful about it */ @@ -884,6 +889,8 @@ PrintNewControlValues(void) ControlFile.checkPointCopy.oldestXid); printf(_("OldestXID's DB: %u\n"), ControlFile.checkPointCopy.oldestXidDB); + printf(_("OldestXidWithEpochHavingUndo:" UINT64_FORMAT "\n"), + ControlFile.checkPointCopy.oldestXidWithEpochHavingUndo); } if (set_xid_epoch != -1) diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 4002847..e71a71e 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -29,7 +29,7 @@ * RmgrNames is an array of resource manager names, to make error messages * a bit nicer. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undodesc) \ name, static const char *RmgrNames[RM_MAX_ID + 1] = { diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 938150d..3ef0a60 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -20,6 +20,7 @@ #include "access/nbtxlog.h" #include "access/rmgr.h" #include "access/spgxlog.h" +#include "access/undoaction_xlog.h" #include "access/undolog_xlog.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -33,7 +34,7 @@ #include "storage/standbydefs.h" #include "utils/relmapper.h" -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undodesc) \ { name, desc, identify}, const RmgrDescData RmgrDescTable[RM_MAX_ID + 1] = { diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h index c9b5c56..e1fb42a 100644 --- a/src/include/access/rmgr.h +++ b/src/include/access/rmgr.h @@ -19,7 +19,7 @@ typedef uint8 RmgrId; * Note: RM_MAX_ID must fit in RmgrId; widening that type will affect the XLOG * file format. */ -#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask) \ +#define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,undo,undo_desc) \ symname, typedef enum RmgrIds diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 9c6fca4..eee7835 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -25,26 +25,27 @@ */ /* symbol name, textual name, redo, desc, identify, startup, cleanup */ -PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL) -PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL) -PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL) -PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL) -PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL) -PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL) -PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL) -PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL) -PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL) -PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask) -PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask) -PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask) -PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask) -PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask) -PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask) -PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask) -PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask) -PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL) -PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) -PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) -PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) -PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL) +PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_CLOG_ID, "CLOG", clog_redo, clog_desc, clog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_DBASE_ID, "Database", dbase_redo, dbase_desc, dbase_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_TBLSPC_ID, "Tablespace", tblspc_redo, tblspc_desc, tblspc_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_MULTIXACT_ID, "MultiXact", multixact_redo, multixact_desc, multixact_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_RELMAP_ID, "RelMap", relmap_redo, relmap_desc, relmap_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_STANDBY_ID, "Standby", standby_redo, standby_desc, standby_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_HEAP2_ID, "Heap2", heap2_redo, heap2_desc, heap2_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_HEAP_ID, "Heap", heap_redo, heap_desc, heap_identify, NULL, NULL, heap_mask, NULL, NULL) +PG_RMGR(RM_BTREE_ID, "Btree", btree_redo, btree_desc, btree_identify, NULL, NULL, btree_mask, NULL, NULL) +PG_RMGR(RM_HASH_ID, "Hash", hash_redo, hash_desc, hash_identify, NULL, NULL, hash_mask, NULL, NULL) +PG_RMGR(RM_GIN_ID, "Gin", gin_redo, gin_desc, gin_identify, gin_xlog_startup, gin_xlog_cleanup, gin_mask, NULL, NULL) +PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_startup, gist_xlog_cleanup, gist_mask, NULL, NULL) +PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL, seq_mask, NULL, NULL) +PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup, spg_mask, NULL, NULL) +PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL, brin_mask, NULL, NULL) +PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL, NULL) +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOLOG_ID, "UndoLog", undolog_redo, undolog_desc, undolog_identify, NULL, NULL, NULL, NULL, NULL) +PG_RMGR(RM_UNDOACTION_ID, "UndoAction", undoaction_redo, undoaction_desc, undoaction_identify, NULL, NULL, NULL, NULL, NULL) diff --git a/src/include/access/transam.h b/src/include/access/transam.h index 83ec3f1..7b983ef 100644 --- a/src/include/access/transam.h +++ b/src/include/access/transam.h @@ -68,6 +68,10 @@ (AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \ (int32) ((id1) - (id2)) > 0) +/* Extract xid from a value comprised of epoch and xid */ +#define GetXidFromEpochXid(epochxid) \ + ((uint32) (epochxid) & 0XFFFFFFFF) + /* ---------- * Object ID (OID) zero is InvalidOid. * diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 0e932da..fb05c3b 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -41,7 +41,7 @@ extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, Oid owner, Oid databaseid); -extern void StartPrepare(GlobalTransaction gxact); +extern void StartPrepare(GlobalTransaction gxact, UndoRecPtr *, UndoRecPtr *); extern void EndPrepare(GlobalTransaction gxact); extern bool StandbyTransactionIdIsPrepared(TransactionId xid); diff --git a/src/include/access/undoaction.h b/src/include/access/undoaction.h new file mode 100644 index 0000000..5455259 --- /dev/null +++ b/src/include/access/undoaction.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * undoaction.h + * undo action prototypes + * + * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_H +#define UNDOACTION_H + +#include "postgres.h" + +#include "access/undolog.h" +#include "access/undorecord.h" + +/* undo record information */ +typedef struct UndoRecInfo +{ + UndoRecPtr urp; /* undo recptr (undo record location). */ + UnpackedUndoRecord *uur; /* actual undo record. */ +} UndoRecInfo; + +#endif diff --git a/src/include/access/undoaction_xlog.h b/src/include/access/undoaction_xlog.h new file mode 100644 index 0000000..bfc6418 --- /dev/null +++ b/src/include/access/undoaction_xlog.h @@ -0,0 +1,74 @@ +/*------------------------------------------------------------------------- + * + * undoaction_xlog.h + * undo action XLOG definitions + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undoaction_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDOACTION_XLOG_H +#define UNDOACTION_XLOG_H + +#include "access/undolog.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" +#include "storage/off.h" + +/* + * WAL record definitions for undoactions.c's WAL operations + */ +#define XLOG_UNDO_PAGE 0x00 +#define XLOG_UNDO_RESET_SLOT 0x10 +#define XLOG_UNDO_APPLY_PROGRESS 0x20 + +/* + * xl_undoaction_page flag values, 8 bits are available. + */ +#define XLU_PAGE_CONTAINS_TPD_SLOT (1<<0) +#define XLU_PAGE_CLEAR_VISIBILITY_MAP (1<<1) +#define XLU_CONTAINS_TPD_OFFSET_MAP (1<<2) +#define XLU_INIT_PAGE (1<<3) + +/* This is what we need to know about delete */ +typedef struct xl_undoaction_page +{ + UndoRecPtr urec_ptr; + TransactionId xid; + int trans_slot_id; /* transaction slot id */ +} xl_undoaction_page; + +#define SizeOfUndoActionPage (offsetof(xl_undoaction_page, trans_slot_id) + sizeof(int)) + +/* This is what we need to know about undo apply progress */ +typedef struct xl_undoapply_progress +{ + UndoRecPtr urec_ptr; + uint32 progress; +} xl_undoapply_progress; + +#define SizeOfUndoActionProgress (offsetof(xl_undoapply_progress, progress) + sizeof(uint32)) + +/* + * xl_undoaction_reset_slot flag values, 8 bits are available. + */ +#define XLU_RESET_CONTAINS_TPD_SLOT (1<<0) + +/* This is what we need to know about delete */ +typedef struct xl_undoaction_reset_slot +{ + UndoRecPtr urec_ptr; + int trans_slot_id; /* transaction slot id */ + uint8 flags; +} xl_undoaction_reset_slot; + +#define SizeOfUndoActionResetSlot (offsetof(xl_undoaction_reset_slot, flags) + sizeof(uint8)) + +extern void undoaction_redo(XLogReaderState *record); +extern void undoaction_desc(StringInfo buf, XLogReaderState *record); +extern const char *undoaction_identify(uint8 info); + +#endif /* UNDOACTION_XLOG_H */ diff --git a/src/include/access/undodiscard.h b/src/include/access/undodiscard.h new file mode 100644 index 0000000..70d6408 --- /dev/null +++ b/src/include/access/undodiscard.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * undoinsert.h + * undo discard definitions + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/undodiscard.h + * + *------------------------------------------------------------------------- + */ +#ifndef UNDODISCARD_H +#define UNDODISCARD_H + +#include "access/undolog.h" +#include "access/xlogdefs.h" +#include "catalog/pg_class.h" +#include "storage/lwlock.h" + +/* + * Discard the undo for all the transaction whose xid is smaller than xmin + * + * Check the DiscardInfo memory array for each slot (every undo log) , process + * the undo log for all the slot which have xid smaller than xmin or invalid + * xid. Fetch the record from the undo log transaction by transaction until we + * find the xid which is not smaller than xmin. + */ +extern void UndoDiscard(TransactionId xmin, bool *hibernate); + +#endif /* UNDODISCARD_H */ diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h index 2b73f9b..fda3082 100644 --- a/src/include/access/undoinsert.h +++ b/src/include/access/undoinsert.h @@ -96,6 +96,8 @@ extern void UndoSetPrepareSize(int max_prepare, UnpackedUndoRecord *undorecords, extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen); extern void UndoRecordOnUndoLogChange(UndoPersistence persistence); +extern void PrepareUpdateUndoActionProgress(UndoRecPtr urecptr, int progress); +extern void UndoRecordUpdateTransInfo(void); #endif /* UNDOINSERT_H */ diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h index 85642ad..13512c9 100644 --- a/src/include/access/undorecord.h +++ b/src/include/access/undorecord.h @@ -20,6 +20,17 @@ #include "storage/buf.h" #include "storage/off.h" +typedef enum undorectype +{ + UNDO_INSERT, + UNDO_MULTI_INSERT, + UNDO_DELETE, + UNDO_INPLACE_UPDATE, + UNDO_UPDATE, + UNDO_XID_LOCK_ONLY, + UNDO_XID_MULTI_LOCK_ONLY, + UNDO_ITEMID_UNUSED +} undorectype; /* * Every undo record begins with an UndoRecordHeader structure, which is @@ -30,6 +41,7 @@ */ typedef struct UndoRecordHeader { + RmgrId urec_rmid; /* RMGR [XXX:TODO: this creates an alignment hole?] */ uint8 urec_type; /* record type code */ uint8 urec_info; /* flag bits */ uint16 urec_prevlen; /* length of previous record in bytes */ @@ -148,6 +160,7 @@ typedef struct UndoRecordPayload */ typedef struct UnpackedUndoRecord { + RmgrId uur_rmid; /* rmgr ID */ uint8 uur_type; /* record type code */ uint8 uur_info; /* flag bits */ uint16 uur_prevlen; /* length of previous record */ diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 73394c5..ad8cabe 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -419,6 +419,7 @@ extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int xactflags, TransactionId twophase_xid, const char *twophase_gid); extern void xact_redo(XLogReaderState *record); +extern void XactPerformUndoActionsIfPending(void); /* xactdesc.c */ extern void xact_desc(StringInfo buf, XLogReaderState *record); diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 8cfcd44..89dc2b5 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -192,6 +192,9 @@ extern bool XLOG_DEBUG; #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +/* Generate a 64-bit xid by using epoch and 32-bit xid. */ +#define MakeEpochXid(epoch, xid) \ + ((epoch << 32) | (xid)) /* Checkpoint statistics */ typedef struct CheckpointStatsData diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index 30610b3..9226cff 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -21,8 +21,10 @@ #include "access/xlogdefs.h" #include "access/xlogreader.h" +#include "access/undorecord.h" #include "datatype/timestamp.h" #include "lib/stringinfo.h" +#include "nodes/pg_list.h" #include "pgtime.h" #include "storage/block.h" #include "storage/relfilenode.h" @@ -294,9 +296,14 @@ typedef struct RmgrData void (*rm_startup) (void); void (*rm_cleanup) (void); void (*rm_mask) (char *pagedata, BlockNumber blkno); + bool (*rm_undo) (List *luinfo, UndoRecPtr urec_ptr, Oid reloid, + TransactionId xid, BlockNumber blkno, + bool blk_chain_complete, bool rellock, + int options); + void (*rm_undo_desc) (StringInfo buf, UnpackedUndoRecord *record); } RmgrData; -extern const RmgrData RmgrTable[]; +extern PGDLLIMPORT const RmgrData RmgrTable[]; /* * Exported to support xlog switching from checkpointer diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 773d9e6..6918efc 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -61,6 +61,13 @@ typedef struct CheckPoint * set to InvalidTransactionId. */ TransactionId oldestActiveXid; + + /* + * Oldest transaction id with epoc which is having undo. Include this value + * in the checkpoint record so that whenever server starts we get proper + * value. + */ + uint64 oldestXidWithEpochHavingUndo; } CheckPoint; /* XLOG info values for XLOG rmgr */ diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index d6b32c0..549bb38 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -245,6 +245,7 @@ extern PGDLLIMPORT bool allowSystemTableMods; extern PGDLLIMPORT int work_mem; extern PGDLLIMPORT int maintenance_work_mem; extern PGDLLIMPORT int max_parallel_maintenance_workers; +extern PGDLLIMPORT int rollback_overflow_size; extern int VacuumCostPageHit; extern int VacuumCostPageMiss; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 763379e..7c5f05e 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -749,6 +749,7 @@ typedef enum BackendState #define PG_WAIT_TIMEOUT 0x09000000U #define PG_WAIT_IO 0x0A000000U #define PG_WAIT_PAGE_TRANS_SLOT 0x0B000000U +#define PG_WAIT_ROLLBACK_HT 0x0C000000U /* ---------- * Wait Events - Activity @@ -774,6 +775,8 @@ typedef enum WAIT_EVENT_WAL_RECEIVER_MAIN, WAIT_EVENT_WAL_SENDER_MAIN, WAIT_EVENT_WAL_WRITER_MAIN, + WAIT_EVENT_UNDO_DISCARD_WORKER_MAIN, + WAIT_EVENT_UNDO_LAUNCHER_MAIN } WaitEventActivity; /* ---------- diff --git a/src/include/postmaster/discardworker.h b/src/include/postmaster/discardworker.h new file mode 100644 index 0000000..f00c6c4 --- /dev/null +++ b/src/include/postmaster/discardworker.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * discardworker.h + * Exports from postmaster/discardworker.c. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/postmaster/discardworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef _DISCARDWORKER_H +#define _DISCARDWORKER_H + +/* + * This function will perform multiple actions based on need. (a) retrieve + * transactions which have become all-visible and truncate the associated undo + * logs or will increment the tail pointer. (b) drop the buffers corresponding + * to truncated pages. + */ +extern void DiscardWorkerMain(Datum main_arg) pg_attribute_noreturn(); +extern void DiscardWorkerRegister(void); + +#endif /* _DISCARDWORKER_H */ diff --git a/src/include/postmaster/undoloop.h b/src/include/postmaster/undoloop.h new file mode 100644 index 0000000..21d91c3 --- /dev/null +++ b/src/include/postmaster/undoloop.h @@ -0,0 +1,89 @@ +/*------------------------------------------------------------------------- + * + * undoloop.h + * Exports from postmaster/undoloop.c. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * + * src/include/postmaster/undoloop.h + * + *------------------------------------------------------------------------- + */ +#ifndef _UNDOLOOP_H +#define _UNDOLOOP_H + +#include "access/undoinsert.h" +#include "utils/hsearch.h" +#include "utils/relcache.h" + + +/* Various options while executing the undo actions for the page. */ +#define UNDO_ACTION_UPDATE_TPD 0x0001 + +/* Remembers the last seen RecentGlobalXmin */ +TransactionId latestRecentGlobalXmin; + +/* + * This function will read the undo records starting from the undo + * from_urecptr till to_urecptr and if to_urecptr is invalid then till the + * first undo location of transaction. This also discards the buffers by + * calling DropUndoBuffers for which undo log is removed. This function + * can be used by RollbackToSavePoint, by Rollback, by undoworker to complete + * the work of errored out transactions or when there is an error in single + * user mode. + */ +extern void execute_undo_actions(UndoRecPtr from_urecptr, + UndoRecPtr to_urecptr, bool nopartial, bool rewind, bool rellock); +extern void process_and_execute_undo_actions_page(UndoRecPtr from_urecptr, + Relation rel, Buffer buffer, uint32 epoch, + TransactionId xid, int slot_no); + +/* + * This function will be responsible to truncate the undo logs + * for transactions that become all-visible after RecentGlobalXmin has + * advanced (value is different than latestRecentGlobalXmin). The easiest + * way could be to traverse the undo log array that contains least transaction + * id for that undo log and see if it precedes RecentGlobalXmin, then start + * discarding the undo log for that transaction (moving the tail pointer of + * undo log) till it finds the transaction which is not all-visible. This also + * discards the buffers by calling ForgetBuffer for which undo log is + * removed. This function can be invoked by undoworker or after commit in + * single user mode. + */ +extern void recover_undo_pages(); + +/* + * To increase the efficiency of the zheap system, we create a hash table for + * the rollbacks. All the rollback requests exceeding certain threshold, are + * pushed to this table. Undo worker starts reading the entries from this hash + * table one at a time, performs undo actions related to the respective xid and + * removes them from the hash table. This way backend is free from performing the + * undo actions in case of heavy rollbacks. The data structures and the routines + * required for this infrastructure are as follows. + */ + +/* This is the data structure for each hash table entry for rollbacks. */ +typedef struct RollbackHashEntry +{ + UndoRecPtr start_urec_ptr; + UndoRecPtr end_urec_ptr; + Oid dbid; +} RollbackHashEntry; + +extern bool RollbackHTIsFull(void); + +/* To push the rollback requests from backend to the respective hash table */ +extern bool PushRollbackReq(UndoRecPtr start_urec_ptr, UndoRecPtr end_urec_ptr, + Oid dbid); + +/* To perform the undo actions reading from the hash table */ +extern void RollbackFromHT(Oid dbid); +/* To calculate the size of the hash table size for rollabcks. */ +extern int RollbackHTSize(void); + +/* To initialize the hash table in shared memory for rollbacks. */ +extern void InitRollbackHashTable(void); +extern List *RollbackHTGetDBList(void); +extern bool ConditionTransactionUndoActionLock(TransactionId xid); +extern void TransactionUndoActionLockRelease(TransactionId xid); +#endif /* _UNDOLOOP_H */ diff --git a/src/include/postmaster/undoworker.h b/src/include/postmaster/undoworker.h new file mode 100644 index 0000000..c277e01 --- /dev/null +++ b/src/include/postmaster/undoworker.h @@ -0,0 +1,39 @@ +/*------------------------------------------------------------------------- + * + * undoworker.h + * Exports from postmaster/undoworker.c. + * + * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/postmaster/undoworker.h + * + *------------------------------------------------------------------------- + */ +#ifndef _UNDOWORKER_H +#define _UNDOWORKER_H + +/* GUC options */ +/* undo worker sleep time between rounds */ +extern int UndoWorkerDelay; + +/* + * This function will perform multiple actions based on need. (a) retreive + * transaction and its corresponding undopoiter from shared memory queue and + * call undoloop to perform undo actions. After applying all the undo records + * for a particular transaction, it will increment the tail pointer in undo log. + * (b) it needs to retrieve transactions which have become all-visible and truncate + * the associated undo logs or will increment the tail pointer. (c) udjust the + * number of undo workers based on the work required to perform undo actions + * (it could be size of shared memory queue containing transactions that needs + * aborts). (d) drop the buffers corresponding to truncated pages (e) Sleep for + * UndoWorkerDelay, if there is no more work. + */ +extern void UndoWorkerMain(Datum main_arg) pg_attribute_noreturn(); +extern void UndoLauncherRegister(void); +extern void UndoLauncherShmemInit(void); +extern Size UndoLauncherShmemSize(void); +extern void UndoLauncherMain(Datum main_arg); +extern void UndoWorkerMain(Datum main_arg); + +#endif /* _UNDOWORKER_H */ diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index a37fda7..675761c 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -150,6 +150,8 @@ typedef enum LockTagType LOCKTAG_VIRTUALTRANSACTION, /* virtual transaction (ditto) */ /* ID info for a virtual transaction is its VirtualTransactionId */ LOCKTAG_SPECULATIVE_TOKEN, /* speculative insertion Xid and token */ + /* ID info for an transaction undoaction is transaction id */ + LOCKTAG_TRANSACTION_UNDOACTION, /* transaction (waiting for undoaction) */ /* ID info for a transaction is its TransactionId */ LOCKTAG_OBJECT, /* non-relation database object */ /* ID info for an object is DB OID + CLASS OID + OBJECT OID + SUBID */ @@ -246,6 +248,14 @@ typedef struct LOCKTAG (locktag).locktag_type = LOCKTAG_SPECULATIVE_TOKEN, \ (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD) +#define SET_LOCKTAG_TRANSACTION_UNDOACTION(locktag,xid) \ + ((locktag).locktag_field1 = (xid), \ + (locktag).locktag_field2 = 0, \ + (locktag).locktag_field3 = 0, \ + (locktag).locktag_field4 = 0, \ + (locktag).locktag_type = LOCKTAG_TRANSACTION_UNDOACTION, \ + (locktag).locktag_lockmethodid = DEFAULT_LOCKMETHOD) + #define SET_LOCKTAG_OBJECT(locktag,dboid,classoid,objoid,objsubid) \ ((locktag).locktag_field1 = (dboid), \ (locktag).locktag_field2 = (classoid), \ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index cb613c8..f6b9d98 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -270,6 +270,8 @@ typedef struct PROC_HDR int startupProcPid; /* Buffer id of the buffer that Startup process waits for pin on, or -1 */ int startupBufferPinWaitBufId; + /* Oldest transaction id which is having undo. */ + pg_atomic_uint64 oldestXidWithEpochHavingUndo; } PROC_HDR; extern PGDLLIMPORT PROC_HDR *ProcGlobal; -- 1.8.3.1