From b40910de70971d529c05bd11b0cf38d98bde43bc Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 22 Sep 2020 22:53:55 +0900 Subject: [PATCH v26 07/11] Automatic foreign transaciton resolution on COMMIT/ROLLBACK PREPARED. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- src/backend/access/fdwxact/Makefile | 4 +- src/backend/access/fdwxact/fdwxact.c | 553 ++++++++++++++++- src/backend/access/fdwxact/launcher.c | 556 ++++++++++++++++++ src/backend/access/fdwxact/resolver.c | 406 +++++++++++++ src/backend/access/transam/twophase.c | 50 +- src/backend/access/transam/xact.c | 3 + src/backend/postmaster/bgworker.c | 8 + src/backend/postmaster/pgstat.c | 9 + src/backend/postmaster/postmaster.c | 14 +- src/backend/replication/syncrep.c | 15 +- src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/lmgr/lwlocknames.txt | 2 + src/backend/storage/lmgr/proc.c | 8 + src/backend/tcop/postgres.c | 14 + src/backend/utils/misc/guc.c | 37 ++ src/backend/utils/misc/postgresql.conf.sample | 12 + src/include/access/fdwxact.h | 26 + src/include/access/fdwxact_launcher.h | 28 + src/include/access/fdwxact_resolver.h | 23 + src/include/access/resolver_internal.h | 63 ++ src/include/catalog/pg_proc.dat | 5 + src/include/pgstat.h | 3 + src/include/replication/syncrep.h | 2 +- src/include/storage/proc.h | 12 + src/include/utils/guc_tables.h | 2 + 25 files changed, 1817 insertions(+), 41 deletions(-) create mode 100644 src/backend/access/fdwxact/launcher.c create mode 100644 src/backend/access/fdwxact/resolver.c create mode 100644 src/include/access/fdwxact_launcher.h create mode 100644 src/include/access/fdwxact_resolver.h create mode 100644 src/include/access/resolver_internal.h diff --git a/src/backend/access/fdwxact/Makefile b/src/backend/access/fdwxact/Makefile index aacab1d729..59e8d451b5 100644 --- a/src/backend/access/fdwxact/Makefile +++ b/src/backend/access/fdwxact/Makefile @@ -12,6 +12,8 @@ subdir = src/backend/access/fdwxact top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -OBJS = fdwxact.o +OBJS = fdwxact.o \ + resolver.o \ + launcher.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/fdwxact/fdwxact.c b/src/backend/access/fdwxact/fdwxact.c index c6f6a92752..e3b5937054 100644 --- a/src/backend/access/fdwxact/fdwxact.c +++ b/src/backend/access/fdwxact/fdwxact.c @@ -15,8 +15,9 @@ * To achieve commit among all foreign servers atomically, the global transaction * manager supports two-phase commit protocol, which is a type of atomic commitment * protocol(ACP). Foreign servers whose FDW implements prepare API are prepared - * when PREPARE TRANSACTION. To commit or rollback prepared foreign transactions - * we can use pg_resolve_foreign_xact() function. + * when PREPARE TRANSACTION. On COMMIT PREPARED or ROLLBACK PREPARED the local + * transaction, we collect the involved foreign transaction and wait for the resolver + * process committing or rolling back the foreign transactions. * * Two-phase commit protocol is crash-safe. We WAL logs the foreign transaction * information. @@ -70,7 +71,10 @@ #include #include "access/fdwxact.h" +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" #include "access/twophase.h" +#include "access/resolver_internal.h" #include "access/xact.h" #include "access/xlog.h" #include "access/xloginsert.h" @@ -83,11 +87,14 @@ #include "storage/ipc.h" #include "storage/latch.h" #include "storage/lock.h" +#include "storage/pmsignal.h" #include "storage/procarray.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/ps_status.h" /* Check the FdwXactParticipant is capable of two-phase commit */ #define ServerSupportTransactionCallback(fdw_part) \ @@ -142,25 +149,35 @@ typedef struct FdwXactParticipant /* * List of foreign transactions involved in the transaction. A member of * participants must support both commit and rollback APIs. + + * FdwXactParticipants_tmp is used to update FdwXactParticipants atomically + * when executing COMMIT/ROLLBACK PREPARED command. In COMMIT PREPARED case, + * we don't want to rollback foreign transactions even if an error occurs, + * because the local prepared transaction never turn over rollback in that + * case. However, preparing FdwXactParticipants might be lead an error + * because of calling palloc() inside. So we prepare FdwXactParticipants in + * two phase. In the first phase, CollectFdwXactParticipants(), we collect + * all foreign transactions associated with the local prepared transactions + * and kept them in FdwXactParticipants_tmp. Even if an error occurs during + * that, we don't rollback them. In the second phase, SetFdwXactParticipants(), + * we replace FdwXactParticipants_tmp with FdwXactParticipants and hold them. */ static List *FdwXactParticipants = NIL; - -/* Keep track of registering process exit call back. */ -static bool fdwXactExitRegistered = false; +static List *FdwXactParticipants_tmp = NIL; /* Guc parameter */ int max_prepared_foreign_xacts = 0; +int max_foreign_xact_resolvers = 0; -static void AtProcExit_FdwXact(int code, Datum arg); static void FdwXactPrepareForeignTransactions(void); -static void ForgetAllFdwXactParticipants(void); static void FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit); static FdwXact FdwXactInsertFdwXactEntry(TransactionId xid, FdwXactParticipant *fdw_part); -static void FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts); static void FdwXactComputeRequiredXmin(void); static FdwXactStatus FdwXactGetTransactionFate(TransactionId xid); +static void FdwXactQueueInsert(PGPROC *waiter); +static void FdwXactCancelWait(void); static void FdwXactResolveOneFdwXact(FdwXact fdwxact); static void FdwXactRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn); static void FdwXactRedoRemove(Oid dbid, TransactionId xid, Oid serverid, @@ -181,6 +198,10 @@ static char *get_fdwxact_identifier(FdwXactParticipant *fdw_part, TransactionId xid); static int get_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid); +#ifdef USE_ASSERT_CHECKING +static bool FdwXactQueueIsOrderedByTimestamp(void); +#endif + /* * Calculates the size of shared memory allocated for maintaining foreign * prepared transaction entries. @@ -267,13 +288,6 @@ FdwXactRegisterXact(Oid serverid, Oid userid) } } - /* on first call, register the exit hook */ - if (!fdwXactExitRegistered) - { - before_shmem_exit(AtProcExit_FdwXact, 0); - fdwXactExitRegistered = true; - } - routine = GetFdwRoutineByServerId(serverid); /* Foreign server must implement both callback */ @@ -338,6 +352,7 @@ create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine) fdw_part->commit_foreign_xact_fn = routine->CommitForeignTransaction; fdw_part->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; fdw_part->prepare_foreign_xact_fn = routine->PrepareForeignTransaction; + fdw_part->get_prepareid_fn = routine->GetPrepareId; return fdw_part; } @@ -427,6 +442,9 @@ FdwXactPrepareForeignTransactions(void) static char * get_fdwxact_identifier(FdwXactParticipant *fdw_part, TransactionId xid) { + char *id; + int id_len = 0; + /* * If FDW doesn't provide the callback function, generate an unique * identifier. @@ -589,6 +607,7 @@ insert_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, FdwXactCtl->fdwxacts[FdwXactCtl->num_fdwxacts++] = fdwxact; fdwxact->status = FDWXACT_STATUS_PREPARING; + fdwxact->proc = MyProc; fdwxact->local_xid = xid; fdwxact->dbid = dbid; fdwxact->serverid = serverid; @@ -645,6 +664,7 @@ remove_fdwxact(FdwXact fdwxact) /* Reset informations */ fdwxact->status = FDWXACT_STATUS_INVALID; + fdwxact->proc = NULL; fdwxact->locking_backend = InvalidBackendId; fdwxact->valid = false; fdwxact->ondisk = false; @@ -722,16 +742,417 @@ PrePrepare_FdwXact(void) /* Prepare transactions on participating foreign servers */ FdwXactPrepareForeignTransactions(); + /* + * We keep prepared foreign transaction participants to rollback them in + * case of failure. + */ +} + +/* + * After PREPARE TRANSACTION, we forget all participants. + */ +void +PostPrepare_FdwXact(void) +{ ForgetAllFdwXactParticipants(); } /* - * When the process exits, forget all the entries. + * Collect all foreign transactions associated with the given xid if it's a prepared + * transaction. Return true if COMMIT PREPARED or ROLLBACK PREPARED needs to wait for + * all foreign transactions to be resolved. The collected foreign transactions are + * kept in FdwXactParticipants_tmp. The caller must call SetFdwXactParticipants() + * later if this function returns true. */ -static void -AtProcExit_FdwXact(int code, Datum arg) +bool +CollectFdwXactParticipants(TransactionId xid) +{ + MemoryContext old_ctx; + + Assert(FdwXactParticipants_tmp == NIL); + + old_ctx = MemoryContextSwitchTo(TopTransactionContext); + + LWLockAcquire(FdwXactLock, LW_SHARED); + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXactParticipant *fdw_part; + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + FdwRoutine *routine; + + if (!fdwxact->valid || fdwxact->local_xid != xid) + continue; + + routine = GetFdwRoutineByServerId(fdwxact->serverid); + fdw_part = create_fdwxact_participant(fdwxact->serverid, fdwxact->userid, + routine); + fdw_part->fdwxact = fdwxact; + + /* Add to the participants list */ + FdwXactParticipants_tmp = lappend(FdwXactParticipants_tmp, fdw_part); + } + LWLockRelease(FdwXactLock); + + MemoryContextSwitchTo(old_ctx); + + /* Return true if we collect at least one foreign transaction */ + return (FdwXactParticipants_tmp != NIL); +} + +/* + * Set the collected foreign transactions to the participants of this transaction, + * and hold them. This function must be called after CollectFdwXactParticipants(). + */ +void +SetFdwXactParticipants(TransactionId xid) +{ + ListCell *lc; + + Assert(FdwXactParticipants_tmp != NIL); + Assert(FdwXactParticipants == NIL); + + FdwXactParticipants = FdwXactParticipants_tmp; + FdwXactParticipants_tmp = NIL; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + Assert(ServerSupportTwophaseCommit(fdw_part)); + Assert(fdw_part->fdwxact->status == FDWXACT_STATUS_PREPARED); + Assert(fdw_part->fdwxact->locking_backend == InvalidBackendId); + Assert(!fdw_part->fdwxact->proc); + + /* Hold the fdwxact entry and set the status */ + fdw_part->fdwxact->locking_backend = MyBackendId; + fdw_part->fdwxact->proc = MyProc; + } + LWLockRelease(FdwXactLock); +} + +void +FdwXactCleanupAtProcExit(void) { ForgetAllFdwXactParticipants(); + if (!SHMQueueIsDetached(&(MyProc->fdwXactLinks))) + { + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + SHMQueueDelete(&(MyProc->fdwXactLinks)); + LWLockRelease(FdwXactResolutionLock); + } +} + +/* + * Wait for its all foreign transactions to be resolved. + * + * Initially backends start in state FDWXACT_NOT_WAITING and then change + * that state to FDWXACT_WAITING before adding ourselves to the wait queue. + * During FdwXactResolveForeignTransaction a fdwxact resolver changes the + * state to FDWXACT_WAIT_COMPLETE once all foreign transactions are resolved. + * This backend then resets its state to FDWXACT_NOT_WAITING. + * If a resolver fails to resolve the waiting transaction it moves us to + * the retry queue. + * + * This function is inspired by SyncRepWaitForLSN. + */ +void +FdwXactWaitForResolution(TransactionId wait_xid, bool commit) +{ + ListCell *lc; + char *new_status = NULL; + const char *old_status; + + Assert(FdwXactCtl != NULL); + Assert(TransactionIdIsValid(wait_xid)); + Assert(SHMQueueIsDetached(&(MyProc->fdwXactLinks))); + Assert(MyProc->fdwXactState == FDWXACT_NOT_WAITING); + + /* Quick exit if we don't have any participants */ + if (FdwXactParticipants == NIL) + return; + + /* Set foreign transaction status */ + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (!fdw_part->fdwxact) + continue; + + Assert(fdw_part->fdwxact->locking_backend == MyBackendId); + Assert(fdw_part->fdwxact->proc == MyProc); + + SpinLockAcquire(&(fdw_part->fdwxact->mutex)); + fdw_part->fdwxact->status = commit + ? FDWXACT_STATUS_COMMITTING + : FDWXACT_STATUS_ABORTING; + SpinLockRelease(&(fdw_part->fdwxact->mutex)); + } + + /* Set backend status and enqueue itself to the active queue */ + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + MyProc->fdwXactState = FDWXACT_WAITING; + MyProc->fdwXactWaitXid = wait_xid; + MyProc->fdwXactNextResolutionTs = GetCurrentTransactionStopTimestamp(); + FdwXactQueueInsert(MyProc); + Assert(FdwXactQueueIsOrderedByTimestamp()); + LWLockRelease(FdwXactResolutionLock); + + /* Launch a resolver process if not yet, or wake up */ + FdwXactLaunchOrWakeupResolver(); + + /* + * Alter ps display to show waiting for foreign transaction resolution. + */ + if (update_process_title) + { + int len; + + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 31 + 1); + memcpy(new_status, old_status, len); + sprintf(new_status + len, " waiting for resolution %d", wait_xid); + set_ps_display(new_status); + new_status[len] = '\0'; /* truncate off "waiting ..." */ + } + + /* Wait for all foreign transactions to be resolved */ + for (;;) + { + /* Must reset the latch before testing state */ + ResetLatch(MyLatch); + + /* + * Acquiring the lock is not needed, the latch ensures proper + * barriers. If it looks like we're done, we must really be done, + * because once resolver changes the state to FDWXACT_WAIT_COMPLETE, + * it will never update it again, so we can't be seeing a stale value + * in that case. + */ + if (MyProc->fdwXactState == FDWXACT_WAIT_COMPLETE) + { + ForgetAllFdwXactParticipants(); + break; + } + + /* + * If a wait for foreign transaction resolution is pending, we can + * neither acknowledge the commit nor raise ERROR or FATAL. The + * latter would lead the client to believe that the distributed + * transaction aborted, which is not true: it's already committed + * locally. The former is no good either: the client has requested + * committing a distributed transaction, and is entitled to assume + * that a acknowledged commit is also commit on all foreign servers, + * which might not be true. So in this case we issue a WARNING (which + * some clients may be able to interpret) and shut off further output. + * We do NOT reset PorcDiePending, so that the process will die after + * the commit is cleaned up. + */ + if (ProcDiePending) + { + ereport(WARNING, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("canceling the wait for resolving foreign transaction and terminating connection due to administrator command"), + errdetail("The transaction has already committed locally, but might not have been committed on the foreign server."))); + whereToSendOutput = DestNone; + FdwXactCancelWait(); + break; + } + + /* + * If a query cancel interrupt arrives we just terminate the wait with + * a suitable warning. The foreign transactions can be orphaned but + * the foreign xact resolver can pick up them and tries to resolve + * them later. + */ + if (QueryCancelPending) + { + QueryCancelPending = false; + ereport(WARNING, + (errmsg("canceling wait for resolving foreign transaction due to user request"), + errdetail("The transaction has already committed locally, but might not have been committed on the foreign server."))); + FdwXactCancelWait(); + break; + } + + /* + * If the postmaster dies, we'll probably never get an + * acknowledgement, because all the resolver processes will exit. So + * just bail out. + */ + if (!PostmasterIsAlive()) + { + ProcDiePending = true; + whereToSendOutput = DestNone; + FdwXactCancelWait(); + break; + } + + /* + * Wait on latch. Any condition that should wake us up will set the + * latch, so no need for timeout. + */ + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, + WAIT_EVENT_FDWXACT_RESOLUTION); + } + + pg_read_barrier(); + Assert(SHMQueueIsDetached(&(MyProc->fdwXactLinks))); + MyProc->fdwXactState = FDWXACT_NOT_WAITING; + + if (new_status) + { + set_ps_display(new_status); + pfree(new_status); + } +} + +/* + * Return one backend that connects to my database and is waiting for + * resolution. + */ +PGPROC * +FdwXactGetWaiter(TimestampTz now, TimestampTz *nextResolutionTs_p, + TransactionId *waitXid_p) +{ + PGPROC *proc; + bool found = false; + + Assert(LWLockHeldByMe(FdwXactResolutionLock)); + Assert(FdwXactQueueIsOrderedByTimestamp()); + + /* Initialize variables */ + *nextResolutionTs_p = -1; + *waitXid_p = InvalidTransactionId; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + + while (proc) + { + if (proc->databaseId == MyDatabaseId) + { + if (proc->fdwXactNextResolutionTs <= now) + { + /* Found a waiting process */ + found = true; + *waitXid_p = proc->fdwXactWaitXid; + } + else + /* Found a waiting process supposed to be processed later */ + *nextResolutionTs_p = proc->fdwXactNextResolutionTs; + + break; + } + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + return found ? proc : NULL; +} + +/* + * Return true if there are at least one backend in the wait queue. The caller + * must hold FdwXactResolutionLock. + */ +bool +FdwXactWaiterExists(Oid dbid) +{ + PGPROC *proc; + + Assert(LWLockHeldByMeInMode(FdwXactResolutionLock, LW_SHARED)); + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + + while (proc) + { + if (proc->databaseId == dbid) + return true; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + return false; +} + +/* + * Insert the waiter to the wait queue in fdwXactNextResolutoinTs order. + */ +static void +FdwXactQueueInsert(PGPROC *waiter) +{ + PGPROC *proc; + + Assert(LWLockHeldByMeInMode(FdwXactResolutionLock, LW_EXCLUSIVE)); + + proc = (PGPROC *) SHMQueuePrev(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + + while (proc) + { + if (proc->fdwXactNextResolutionTs < waiter->fdwXactNextResolutionTs) + break; + + proc = (PGPROC *) SHMQueuePrev(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + if (proc) + SHMQueueInsertAfter(&(proc->fdwXactLinks), &(waiter->fdwXactLinks)); + else + SHMQueueInsertAfter(&(FdwXactRslvCtl->fdwxact_queue), &(waiter->fdwXactLinks)); +} + +#ifdef USE_ASSERT_CHECKING +static bool +FdwXactQueueIsOrderedByTimestamp(void) +{ + PGPROC *proc; + TimestampTz lastTs; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + lastTs = 0; + + while (proc) + { + + if (proc->fdwXactNextResolutionTs < lastTs) + return false; + + lastTs = proc->fdwXactNextResolutionTs; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + return true; +} +#endif + +/* + * Acquire FdwXactResolutionLock and cancel any wait currently in progress. + */ +static void +FdwXactCancelWait(void) +{ + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + if (!SHMQueueIsDetached(&(MyProc->fdwXactLinks))) + SHMQueueDelete(&(MyProc->fdwXactLinks)); + MyProc->fdwXactState = FDWXACT_NOT_WAITING; + LWLockRelease(FdwXactResolutionLock); } /* @@ -771,14 +1192,17 @@ FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit) * transaction so that local transaction id of such unresolved foreign transaction * is not truncated. */ -static void +void ForgetAllFdwXactParticipants(void) { ListCell *cell; int nlefts = 0; if (FdwXactParticipants == NIL) + { + Assert(FdwXactParticipants_tmp == NIL); return; + } foreach(cell, FdwXactParticipants) { @@ -802,7 +1226,9 @@ ForgetAllFdwXactParticipants(void) LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); if (fdwxact->valid) { - fdwxact->locking_backend = InvalidBackendId; + if (fdwxact->locking_backend == MyBackendId) + fdwxact->locking_backend = InvalidBackendId; + fdwxact->proc = NULL; nlefts++; } LWLockRelease(FdwXactLock); @@ -819,7 +1245,9 @@ ForgetAllFdwXactParticipants(void) } list_free_deep(FdwXactParticipants); + list_free_deep(FdwXactParticipants_tmp); FdwXactParticipants = NIL; + FdwXactParticipants_tmp = NIL; } /* @@ -851,6 +1279,12 @@ AtEOXact_FdwXact(bool is_commit) continue; } + /* + * We never reach here in commit case since all foreign transaction + * should be committed in that case. + */ + Assert(!is_commit); + /* * Abort the foreign transaction. For participants whose status is * FDWXACT_STATUS_PREPARING, we close the transaction in one-phase. @@ -868,13 +1302,16 @@ AtEOXact_FdwXact(bool is_commit) } /* - * Resolve foreign transactions at the give indexes. + * Resolve foreign transactions at the give indexes. If 'waiter' is not NULL, + * we release the waiter after we resolved all of the given foreign transactions + * Also on failure, we re-enqueue the waiting backend after incremented the next + * resolution time. * * The caller must hold the given foreign transactions in advance to prevent * concurrent update. */ -static void -FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts) +void +FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts, PGPROC *waiter) { for (int i = 0; i < nfdwxacts; i++) { @@ -882,7 +1319,34 @@ FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts) CHECK_FOR_INTERRUPTS(); - FdwXactResolveOneFdwXact(fdwxact); + PG_TRY(); + { + FdwXactResolveOneFdwXact(fdwxact); + } + PG_CATCH(); + { + /* + * Failed to resolve. Re-insert the waiter to the tail of retry + * queue if the waiter is still waiting. + */ + if (waiter) + { + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + if (waiter->fdwXactState == FDWXACT_WAITING) + { + SHMQueueDelete(&(waiter->fdwXactLinks)); + pg_write_barrier(); + waiter->fdwXactNextResolutionTs = + TimestampTzPlusMilliseconds(waiter->fdwXactNextResolutionTs, + foreign_xact_resolution_retry_interval); + FdwXactQueueInsert(waiter); + } + LWLockRelease(FdwXactResolutionLock); + } + + PG_RE_THROW(); + } + PG_END_TRY(); LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); if (fdwxact->ondisk) @@ -891,6 +1355,38 @@ FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts) remove_fdwxact(fdwxact); LWLockRelease(FdwXactLock); } + + if (!waiter) + return; + + /* + * We have resolved all foreign transactions. Remove waiter from shmem + * queue, if not detached yet. The waiter could already be detached if + * user cancelled to wait before resolution. + */ + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + if (!SHMQueueIsDetached(&(waiter->fdwXactLinks))) + { + TransactionId wait_xid = waiter->fdwXactWaitXid; + + SHMQueueDelete(&(waiter->fdwXactLinks)); + pg_write_barrier(); + + /* Set state to complete */ + waiter->fdwXactState = FDWXACT_WAIT_COMPLETE; + + /* + * Wake up the waiter only when we have set state and removed from + * queue + */ + SetLatch(&(waiter->procLatch)); + + elog(DEBUG2, "released the proc with xid %u", wait_xid); + } + else + elog(DEBUG2, "the waiter backend had been already detached"); + + LWLockRelease(FdwXactResolutionLock); } /* @@ -1709,6 +2205,7 @@ RecoverFdwXacts(void) fdwxact->local_xid, fdwxact->serverid, fdwxact->userid))); /* recovered, so reset the flag for entries generated by redo */ + fdwxact->proc = NULL; fdwxact->inredo = false; fdwxact->valid = true; pfree(buf); @@ -1732,7 +2229,7 @@ typedef struct Datum pg_foreign_xacts(PG_FUNCTION_ARGS) { -#define PG_PREPARED_FDWXACTS_COLS 5 +#define PG_PREPARED_FDWXACTS_COLS 6 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -1804,8 +2301,8 @@ pg_foreign_xacts(PG_FUNCTION_ARGS) break; } values[3] = CStringGetTextDatum(xact_status); - values[4] = PointerGetDatum(cstring_to_text_with_len(fdwxact->fdwxact_id, - strlen(fdwxact->fdwxact_id))); + values[4] = BoolGetDatum(fdwxact->proc == NULL); + values[5] = CStringGetTextDatum(fdwxact->fdwxact_id); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } @@ -1880,7 +2377,7 @@ pg_resolve_foreign_xact(PG_FUNCTION_ARGS) PG_TRY(); { - FdwXactResolveFdwXacts(&idx, 1); + FdwXactResolveFdwXacts(&idx, 1, NULL); } PG_CATCH(); { diff --git a/src/backend/access/fdwxact/launcher.c b/src/backend/access/fdwxact/launcher.c new file mode 100644 index 0000000000..d2ba6bd58c --- /dev/null +++ b/src/backend/access/fdwxact/launcher.c @@ -0,0 +1,556 @@ +/*------------------------------------------------------------------------- + * + * launcher.c + * + * The foreign transaction resolver launcher process starts foreign + * transaction resolver processes. The launcher schedules resolver + * process to be started when arrived a requested by backend process. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/fdwxact/launcher.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "pgstat.h" +#include "funcapi.h" + +#include "access/fdwxact.h" +#include "access/fdwxact_launcher.h" +#include "access/fdwxact_resolver.h" +#include "access/resolver_internal.h" +#include "access/twophase.h" +#include "commands/dbcommands.h" +#include "funcapi.h" +#include "nodes/pg_list.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" + +/* max sleep time between cycles (3min) */ +#define DEFAULT_NAPTIME_PER_CYCLE 180000L + +static void fdwxact_launcher_onexit(int code, Datum arg); +static void fdwxact_launcher_sighup(SIGNAL_ARGS); +static void fdwxact_launch_resolver(Oid dbid); +static bool fdwxact_relaunch_resolvers(void); + +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGUSR2 = false; +FdwXactResolver *MyFdwXactResolver = NULL; + +/* + * Wake up the launcher process to request launching new resolvers + * immediately. + */ +void +FdwXactLauncherRequestToLaunch(void) +{ + if (FdwXactRslvCtl->launcher_pid != InvalidPid) + kill(FdwXactRslvCtl->launcher_pid, SIGUSR2); +} + +/* Report shared memory space needed by FdwXactRsoverShmemInit */ +Size +FdwXactRslvShmemSize(void) +{ + Size size = 0; + + size = add_size(size, SizeOfFdwXactRslvCtlData); + size = add_size(size, mul_size(max_foreign_xact_resolvers, + sizeof(FdwXactResolver))); + + return size; +} + +/* + * Allocate and initialize foreign transaction resolver shared + * memory. + */ +void +FdwXactRslvShmemInit(void) +{ + bool found; + + FdwXactRslvCtl = ShmemInitStruct("Foreign transactions resolvers", + FdwXactRslvShmemSize(), + &found); + + if (!IsUnderPostmaster) + { + int slot; + + /* First time through, so initialize */ + MemSet(FdwXactRslvCtl, 0, FdwXactRslvShmemSize()); + SHMQueueInit(&(FdwXactRslvCtl->fdwxact_queue)); + FdwXactRslvCtl->launcher_pid = InvalidPid; + + for (slot = 0; slot < max_foreign_xact_resolvers; slot++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[slot]; + + memset(resolver, 0, sizeof(FdwXactResolver)); + SpinLockInit(&(resolver->mutex)); + } + } +} + +/* + * Cleanup function for fdwxact launcher + * + * Called on fdwxact launcher exit. + */ +static void +fdwxact_launcher_onexit(int code, Datum arg) +{ + FdwXactRslvCtl->launcher_pid = InvalidPid; +} + +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +fdwxact_launcher_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + SetLatch(MyLatch); + + errno = save_errno; +} + +/* SIGUSR2: set flag to launch new resolver process immediately */ +static void +fdwxact_launcher_sigusr2(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGUSR2 = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Main loop for the fdwxact launcher process. + */ +void +FdwXactLauncherMain(Datum main_arg) +{ + TimestampTz last_start_time = 0; + + ereport(DEBUG1, + (errmsg("fdwxact resolver launcher started"))); + + before_shmem_exit(fdwxact_launcher_onexit, (Datum) 0); + + Assert(FdwXactRslvCtl->launcher_pid == InvalidPid); + FdwXactRslvCtl->launcher_pid = MyProcPid; + FdwXactRslvCtl->launcher_latch = &MyProc->procLatch; + + pqsignal(SIGHUP, fdwxact_launcher_sighup); + pqsignal(SIGUSR2, fdwxact_launcher_sigusr2); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* Enter main loop */ + for (;;) + { + TimestampTz now; + long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + int rc; + + CHECK_FOR_INTERRUPTS(); + ResetLatch(MyLatch); + + now = GetCurrentTimestamp(); + + /* + * Limit the start retry to once a + * foreign_xact_resolution_retry_interval but always starts when the + * backend requested. + */ + if (got_SIGUSR2 || + TimestampDifferenceExceeds(last_start_time, now, + foreign_xact_resolution_retry_interval)) + { + MemoryContext oldctx; + MemoryContext subctx; + bool launched; + + if (got_SIGUSR2) + got_SIGUSR2 = false; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Foreign Transaction Launcher", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* + * Launch foreign transaction resolvers that are requested but not + * running. + */ + launched = fdwxact_relaunch_resolvers(); + if (launched) + { + last_start_time = now; + wait_time = foreign_xact_resolution_retry_interval; + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + } + else + { + /* + * The wait in previous cycle was interrupted in less than + * foreign_xact_resolution_retry_interval since last resolver + * started, this usually means crash of the resolver, so we should + * retry in foreign_xact_resolution_retry_interval again. + */ + wait_time = foreign_xact_resolution_retry_interval; + } + + /* Wait for more work */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wait_time, + WAIT_EVENT_FDWXACT_LAUNCHER_MAIN); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + + /* Not reachable */ +} + +/* + * Request launcher to launch a new foreign transaction resolver process + * or wake up the resolver if it's already running. + */ +void +FdwXactLaunchOrWakeupResolver(void) +{ + volatile FdwXactResolver *resolver; + bool found = false; + + /* + * Looking for a resolver process that is running and working on the same + * database. + */ + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + for (int i = 0; i < max_foreign_xact_resolvers; i++) + { + resolver = &FdwXactRslvCtl->resolvers[i]; + + if (resolver->in_use && + resolver->dbid == MyDatabaseId) + { + found = true; + break; + } + } + LWLockRelease(FdwXactResolverLock); + + if (found) + { + /* Found the running resolver */ + elog(DEBUG1, + "found a running foreign transaction resolver process for database %u", + MyDatabaseId); + + /* + * Wakeup the resolver. It's possible that the resolver is starting up + * and doesn't attach its slot yet. Since the resolver will find + * FdwXact entry we inserted soon we don't anything. + */ + if (resolver->latch) + SetLatch(resolver->latch); + + return; + } + + /* Otherwise wake up the launcher to launch new resolver */ + FdwXactLauncherRequestToLaunch(); +} + +/* + * Launch a foreign transaction resolver process that will connect to given + * 'dbid'. + */ +static void +fdwxact_launch_resolver(Oid dbid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + FdwXactResolver *resolver; + int unused_slot = -1; + int i; + + LWLockAcquire(FdwXactResolverLock, LW_EXCLUSIVE); + + /* Find unused resolver slot */ + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[i]; + + if (!resolver->in_use) + { + unused_slot = i; + break; + } + } + + /* No unused found */ + if (i >= max_foreign_xact_resolvers) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of foreign transaction resolver slots"), + errhint("You might need to increase max_foreign_transaction_resolvers."))); + + resolver = &FdwXactRslvCtl->resolvers[unused_slot]; + resolver->in_use = true; + resolver->dbid = dbid; + LWLockRelease(FdwXactResolverLock); + + /* Register the new dynamic worker */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FdwXactResolverMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "foreign transaction resolver for database %u", resolver->dbid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "foreign transaction resolver"); + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(unused_slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to launch, cleanup the worker slot */ + SpinLockAcquire(&(MyFdwXactResolver->mutex)); + resolver->in_use = false; + SpinLockRelease(&(MyFdwXactResolver->mutex)); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase max_worker_processes."))); + } + + /* + * We don't need to wait until it attaches here because we're going to + * wait until all foreign transactions are resolved. + */ +} + +/* + * Launch or relaunch foreign transaction resolvers on database that has + * at least one FdwXact entry but no resolvers are running on it. + */ +static bool +fdwxact_relaunch_resolvers(void) +{ + HTAB *resolver_dbs; /* DBs resolver's running on */ + HTAB *fdwxact_dbs; /* DBs having at least one FdwXact entry */ + HASHCTL ctl; + HASH_SEQ_STATUS status; + Oid *entry; + bool launched; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(Oid); + fdwxact_dbs = hash_create("fdwxact dblist", + 32, &ctl, HASH_ELEM | HASH_BLOBS); + + /* Collect database oids that has at least one FdwXact entry to resolve */ + LWLockAcquire(FdwXactLock, LW_SHARED); + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + if (!fdwxact->valid) + continue; + + /* + * We need to launch resolver process if the process is waiting for + * foreign transaction resolution. + */ + if (fdwxact->proc && fdwxact->proc->fdwXactState == FDWXACT_WAITING) + hash_search(fdwxact_dbs, &(fdwxact->dbid), HASH_ENTER, NULL); + } + LWLockRelease(FdwXactLock); + + /* There is no FdwXact entry, no need to launch new one */ + if (hash_get_num_entries(fdwxact_dbs) == 0) + { + hash_destroy(fdwxact_dbs); + return false; + } + + resolver_dbs = hash_create("resolver dblist", + 32, &ctl, HASH_ELEM | HASH_BLOBS); + + /* Collect database oids on which resolvers are running */ + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + for (int i = 0; i < max_foreign_xact_resolvers; i++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[i]; + + if (!resolver->in_use) + continue; + + hash_search(resolver_dbs, &(resolver->dbid), HASH_ENTER, NULL); + } + LWLockRelease(FdwXactResolverLock); + + /* Find DBs on which no resolvers are running and launch new one on them */ + hash_seq_init(&status, fdwxact_dbs); + while ((entry = (Oid *) hash_seq_search(&status)) != NULL) + { + bool found; + + hash_search(resolver_dbs, entry, HASH_FIND, &found); + + if (!found) + { + /* No resolver is running on this database, launch new one */ + fdwxact_launch_resolver(*entry); + launched = true; + } + } + + hash_destroy(fdwxact_dbs); + hash_destroy(resolver_dbs); + + return launched; +} + +/* Register a background worker running the foreign transaction launcher */ +void +FdwXactLauncherRegister(void) +{ + BackgroundWorker bgw; + + if (max_foreign_xact_resolvers == 0) + return; + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FdwXactLauncherMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "foreign transaction launcher"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "foreign transaction launcher"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +bool +IsFdwXactLauncher(void) +{ + return FdwXactRslvCtl->launcher_pid == MyProcPid; +} + +/* + * Stop the fdwxact resolver running on the given database. + */ +Datum +pg_stop_foreign_xact_resolver(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + FdwXactResolver *resolver = NULL; + int i; + + /* Must be super user */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to stop foreign transaction resolver"))); + + if (!OidIsValid(dbid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid database id"))); + + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + + /* Find the running resolver process on the given database */ + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + resolver = &FdwXactRslvCtl->resolvers[i]; + + /* found! */ + if (resolver->in_use && resolver->dbid == dbid) + break; + } + + if (i >= max_foreign_xact_resolvers) + ereport(ERROR, + (errmsg("there is no running foreign transaction resolver process on database %d", + dbid))); + + /* Found the resolver, terminate it ... */ + kill(resolver->pid, SIGTERM); + + /* ... and wait for it to die */ + for (;;) + { + int rc; + + /* is it gone? */ + if (!resolver->in_use) + break; + + LWLockRelease(FdwXactResolverLock); + + /* Wait a bit --- we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + } + + LWLockRelease(FdwXactResolverLock); + + PG_RETURN_BOOL(true); +} diff --git a/src/backend/access/fdwxact/resolver.c b/src/backend/access/fdwxact/resolver.c new file mode 100644 index 0000000000..3e93a5a84f --- /dev/null +++ b/src/backend/access/fdwxact/resolver.c @@ -0,0 +1,406 @@ +/*------------------------------------------------------------------------- + * + * resolver.c + * + * The foreign transaction resolver background worker resolves foreign + * transactions that participate to a distributed transaction. A resolver + * process is started by foreign transaction launcher for each databases. + * + * A resolver process continues to resolve foreign transactions on the + * database, which the backend process is waiting for resolution. + * + * Normal termination is by SIGTERM, which instructs the resolver process + * to exit(0) at the next convenient moment. Emergency termination is by + * SIGQUIT; like any backend. The resolver process also terminate by timeouts + * only if there is no pending foreign transactions on the database waiting + * to be resolved. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/fdwxact/resolver.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include +#include + +#include "access/fdwxact.h" +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" +#include "access/resolver_internal.h" +#include "access/transam.h" +#include "access/twophase.h" +#include "access/xact.h" +#include "commands/dbcommands.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/timeout.h" +#include "utils/timestamp.h" + +/* max sleep time between cycles (3min) */ +#define DEFAULT_NAPTIME_PER_CYCLE 180000L + +/* GUC parameters */ +int foreign_xact_resolution_retry_interval; +int foreign_xact_resolver_timeout = 60 * 1000; + +FdwXactRslvCtlData *FdwXactRslvCtl; + +static void FXRslvLoop(void); +static long FXRslvComputeSleepTime(TimestampTz now, TimestampTz targetTime); +static void FXRslvCheckTimeout(TimestampTz now); + +static void fdwxact_resolver_sighup(SIGNAL_ARGS); +static void fdwxact_resolver_onexit(int code, Datum arg); +static void fdwxact_resolver_detach(void); +static void fdwxact_resolver_attach(int slot); +static void hold_fdwxacts(PGPROC *waiter); + +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; +static TimestampTz last_resolution_time = -1; + +/* + * held_fdwxacts has indexes of FdwXact which the resolver marked + * as in-processing. We clear that flag from those entries on failure. + */ +static int *held_fdwxacts = NULL; +static int nheld; + +/* Set flag to reload configuration at next convenient time */ +static void +fdwxact_resolver_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Detach the resolver and cleanup the resolver info. + */ +static void +fdwxact_resolver_detach(void) +{ + /* Block concurrent access */ + LWLockAcquire(FdwXactResolverLock, LW_EXCLUSIVE); + + MyFdwXactResolver->pid = InvalidPid; + MyFdwXactResolver->in_use = false; + MyFdwXactResolver->dbid = InvalidOid; + + LWLockRelease(FdwXactResolverLock); +} + +/* + * Cleanup up foreign transaction resolver info. + */ +static void +fdwxact_resolver_onexit(int code, Datum arg) +{ + fdwxact_resolver_detach(); + + /* Release the held foreign transaction entries */ + for (int i = 0; i < nheld; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[held_fdwxacts[i]]; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact->locking_backend = InvalidBackendId; + LWLockRelease(FdwXactLock); + } + + /* + * There might be other waiting online transactions. So request to + * re-launch. + */ + FdwXactLauncherRequestToLaunch(); +} + +/* + * Attach to a slot. + */ +static void +fdwxact_resolver_attach(int slot) +{ + /* Block concurrent access */ + LWLockAcquire(FdwXactResolverLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_foreign_xact_resolvers); + MyFdwXactResolver = &FdwXactRslvCtl->resolvers[slot]; + + if (!MyFdwXactResolver->in_use) + { + LWLockRelease(FdwXactResolverLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign transaction resolver slot %d is empty, cannot attach", + slot))); + } + + Assert(OidIsValid(MyFdwXactResolver->dbid)); + + MyFdwXactResolver->pid = MyProcPid; + MyFdwXactResolver->latch = &MyProc->procLatch; + + before_shmem_exit(fdwxact_resolver_onexit, (Datum) 0); + + LWLockRelease(FdwXactResolverLock); +} + +/* Foreign transaction resolver entry point */ +void +FdwXactResolverMain(Datum main_arg) +{ + int slot = DatumGetInt32(main_arg); + + /* Attach to a slot */ + fdwxact_resolver_attach(slot); + + /* Establish signal handlers */ + pqsignal(SIGHUP, fdwxact_resolver_sighup); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Connect to our database */ + BackgroundWorkerInitializeConnectionByOid(MyFdwXactResolver->dbid, InvalidOid, 0); + + StartTransactionCommand(); + ereport(LOG, + (errmsg("foreign transaction resolver for database \"%s\" has started", + get_database_name(MyFdwXactResolver->dbid)))); + CommitTransactionCommand(); + + held_fdwxacts = palloc(sizeof(int) * max_prepared_foreign_xacts); + nheld = 0; + + /* Initialize stats to a sanish value */ + last_resolution_time = GetCurrentTimestamp(); + + /* Run the main loop */ + FXRslvLoop(); + + proc_exit(0); +} + +/* + * Fdwxact resolver main loop + */ +static void +FXRslvLoop(void) +{ + MemoryContext resolver_ctx; + + resolver_ctx = AllocSetContextCreate(TopMemoryContext, + "Foreign Transaction Resolver", + ALLOCSET_DEFAULT_SIZES); + + /* Enter main loop */ + for (;;) + { + TransactionId waitXid = InvalidTransactionId; + TimestampTz resolutionTs = -1; + TimestampTz now; + int rc; + long sleep_time = DEFAULT_NAPTIME_PER_CYCLE; + + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + MemoryContextSwitchTo(resolver_ctx); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + now = GetCurrentTimestamp(); + + /* + * Process waiter until either the queue gets empty or the queue has + * only waiters that have a future resolution timestamp. + */ + for (;;) + { + PGPROC *waiter; + + CHECK_FOR_INTERRUPTS(); + + LWLockAcquire(FdwXactResolutionLock, LW_SHARED); + + /* Get the waiter from the queue */ + waiter = FdwXactGetWaiter(now, &resolutionTs, &waitXid); + + if (!waiter) + { + /* Not found, break */ + LWLockRelease(FdwXactResolutionLock); + break; + } + + /* Hold the waiter's foreign transactions */ + hold_fdwxacts(waiter); + Assert(nheld > 0); + + LWLockRelease(FdwXactResolutionLock); + + /* + * Resolve the waiter's foreign transactions and release the + * waiter. + */ + StartTransactionCommand(); + FdwXactResolveFdwXacts(held_fdwxacts, nheld, waiter); + CommitTransactionCommand(); + + last_resolution_time = now; + } + + FXRslvCheckTimeout(now); + + sleep_time = FXRslvComputeSleepTime(now, resolutionTs); + + MemoryContextResetAndDeleteChildren(resolver_ctx); + MemoryContextSwitchTo(TopMemoryContext); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + sleep_time, + WAIT_EVENT_FDWXACT_RESOLVER_MAIN); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Check whether there have been foreign transactions by the backend within + * foreign_xact_resolver_timeout and shutdown if not. + */ +static void +FXRslvCheckTimeout(TimestampTz now) +{ + TimestampTz timeout; + + if (foreign_xact_resolver_timeout == 0) + return; + + timeout = TimestampTzPlusMilliseconds(last_resolution_time, + foreign_xact_resolver_timeout); + + if (now < timeout) + return; + + LWLockAcquire(FdwXactResolutionLock, LW_SHARED); + if (!FdwXactWaiterExists(MyDatabaseId)) + { + /* There is no waiting backend */ + StartTransactionCommand(); + ereport(LOG, + (errmsg("foreign transaction resolver for database \"%s\" will stop because the timeout", + get_database_name(MyDatabaseId)))); + CommitTransactionCommand(); + + /* + * Keep holding FdwXactResolutionLock until detached the slot. It is + * necessary to prevent a race condition; a waiter enqueues after + * FdwXactWaiterExists check. + */ + fdwxact_resolver_detach(); + LWLockRelease(FdwXactResolutionLock); + proc_exit(0); + } + else + elog(DEBUG2, "resolver reached to the timeout but don't exist as the queue is not empty"); + + LWLockRelease(FdwXactResolutionLock); +} + +/* + * Compute how long we should sleep by the next cycle. We can sleep until the time + * out or the next resolution time given by nextResolutionTs. + */ +static long +FXRslvComputeSleepTime(TimestampTz now, TimestampTz nextResolutionTs) +{ + long sleeptime = DEFAULT_NAPTIME_PER_CYCLE; + + if (foreign_xact_resolver_timeout > 0) + { + TimestampTz timeout; + long sec_to_timeout; + int microsec_to_timeout; + + /* Compute relative time until wakeup. */ + timeout = TimestampTzPlusMilliseconds(last_resolution_time, + foreign_xact_resolver_timeout); + TimestampDifference(now, timeout, + &sec_to_timeout, µsec_to_timeout); + + sleeptime = Min(sleeptime, + sec_to_timeout * 1000 + microsec_to_timeout / 1000); + } + + if (nextResolutionTs > 0) + { + long sec_to_timeout; + int microsec_to_timeout; + + TimestampDifference(now, nextResolutionTs, + &sec_to_timeout, µsec_to_timeout); + + sleeptime = Min(sleeptime, + sec_to_timeout * 1000 + microsec_to_timeout / 1000); + } + + return sleeptime; +} + +bool +IsFdwXactResolver(void) +{ + return MyFdwXactResolver != NULL; +} + +/* + * Lock foreign transactions associated with the given waiter's transaction + * as in-processing. The caller must hold FdwXactResolutionLock so that + * the waiter doesn't change its state. + */ +static void +hold_fdwxacts(PGPROC *waiter) +{ + Assert(LWLockHeldByMe(FdwXactResolutionLock)); + + nheld = 0; + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + for (int i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + if (fdwxact->valid && fdwxact->local_xid == waiter->fdwXactWaitXid) + { + Assert(fdwxact->proc->fdwXactState == FDWXACT_WAITING); + Assert(fdwxact->dbid == waiter->databaseId); + + held_fdwxacts[nheld++] = i; + fdwxact->locking_backend = MyBackendId; + } + } + LWLockRelease(FdwXactLock); +} diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 6f1f4a2da2..2571473729 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/subtrans.h" #include "access/transam.h" @@ -2216,6 +2217,14 @@ RecordTransactionCommitPrepared(TransactionId xid, XLogRecPtr recptr; TimestampTz committs = GetCurrentTimestamp(); bool replorigin; + bool need_fdwxact_commit; + bool canceled = false; + + /* + * Prepare foreign transactions involving this prepared transaction + * if exist. + */ + need_fdwxact_commit = CollectFdwXactParticipants(xid); /* * Are we using the replication origins feature? Or, in other words, are @@ -2280,12 +2289,25 @@ RecordTransactionCommitPrepared(TransactionId xid, END_CRIT_SECTION(); /* - * Wait for synchronous replication, if required. + * Wait for both synchronous replication and foreign transaction + * resolution, if required * * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr, true); + canceled = SyncRepWaitForLSN(XactLastRecEnd, true); + + if (need_fdwxact_commit) + { + /* Set the collected foreign transaction participants */ + SetFdwXactParticipants(xid); + + if (!canceled) + FdwXactWaitForResolution(xid, true); + + ForgetAllFdwXactParticipants(); + } + } /* @@ -2305,6 +2327,14 @@ RecordTransactionAbortPrepared(TransactionId xid, const char *gid) { XLogRecPtr recptr; + bool need_fdwxact_commit; + bool canceled = false; + + /* + * Prepare foreign transactions involving this prepared transaction + * if exist. + */ + need_fdwxact_commit = CollectFdwXactParticipants(xid); /* * Catch the scenario where we aborted partway through @@ -2339,12 +2369,24 @@ RecordTransactionAbortPrepared(TransactionId xid, END_CRIT_SECTION(); /* - * Wait for synchronous replication, if required. + * Wait for both synchronous replication and foreign transaction + * resolution, if required * * Note that at this stage we have marked clog, but still show as running * in the procarray and continue to hold locks. */ - SyncRepWaitForLSN(recptr, false); + canceled = SyncRepWaitForLSN(XactLastRecEnd, true); + + if (need_fdwxact_commit) + { + /* Set the collected foreign transaction participants */ + SetFdwXactParticipants(xid); + + if (!canceled) + FdwXactWaitForResolution(xid, false); + + ForgetAllFdwXactParticipants(); + } } /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 0a8d1da4bd..0dcc3182ec 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2566,6 +2566,9 @@ PrepareTransaction(void) */ PostPrepare_Twophase(); + /* Release held FdwXact entries */ + PostPrepare_FdwXact(); + /* PREPARE acts the same as COMMIT as far as GUC is concerned */ AtEOXact_GUC(true, 1); AtEOXact_SPI(true); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 5a9a0e3435..b2384f9ab9 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -13,6 +13,8 @@ #include "postgres.h" #include "access/parallel.h" +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" @@ -128,6 +130,12 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "FdwXactResolverMain", FdwXactResolverMain + }, + { + "FdwXactLauncherMain", FdwXactLauncherMain } }; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 6bf5a59b42..120cfa5773 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3663,6 +3663,12 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_CHECKPOINTER_MAIN: event_name = "CheckpointerMain"; break; + case WAIT_EVENT_FDWXACT_RESOLVER_MAIN: + event_name = "FdwXactResolverMain"; + break; + case WAIT_EVENT_FDWXACT_LAUNCHER_MAIN: + event_name = "FdwXactLauncherMain"; + break; case WAIT_EVENT_LOGICAL_APPLY_MAIN: event_name = "LogicalApplyMain"; break; @@ -3773,6 +3779,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_EXECUTE_GATHER: event_name = "ExecuteGather"; break; + case WAIT_EVENT_FDWXACT_RESOLUTION: + event_name = "FdwXactResolution"; + break; case WAIT_EVENT_HASH_BATCH_ALLOCATE: event_name = "HashBatchAllocate"; break; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 81e6cb9ca2..67f3cf0e5e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -94,6 +94,7 @@ #endif #include "access/fdwxact.h" +#include "access/fdwxact_launcher.h" #include "access/transam.h" #include "access/xlog.h" #include "bootstrap/bootstrap.h" @@ -926,6 +927,10 @@ PostmasterMain(int argc, char *argv[]) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"replica\" or \"logical\""))); + if (max_prepared_foreign_xacts > 0 && max_foreign_xact_resolvers == 0) + ereport(ERROR, + (errmsg("preparing foreign transactions (max_prepared_foreign_transactions > 0) requires max_foreign_transaction_resolvers > 0"))); + /* * Other one-time internal sanity checks can go here, if they are fast. * (Put any slow processing further down, after postmaster.pid creation.) @@ -990,12 +995,13 @@ PostmasterMain(int argc, char *argv[]) LocalProcessControlFile(false); /* - * Register the apply launcher. Since it registers a background worker, - * it needs to be called before InitializeMaxBackends(), and it's probably - * a good idea to call it before any modules had chance to take the - * background worker slots. + * Register the apply launcher and foreign transaction launcher. Since + * it registers a background worker, it needs to be called before + * InitializeMaxBackends(), and it's probably a good idea to call it + * before any modules had chance to take the background worker slots. */ ApplyLauncherRegister(); + FdwXactLauncherRegister(); /* * process any libraries that should be preloaded at postmaster start diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 6e8c76537a..a89b99225e 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -143,13 +143,17 @@ static bool SyncRepQueueIsOrderedByLSN(int mode); * represents a commit record. If it doesn't, then we wait only for the WAL * to be flushed if synchronous_commit is set to the higher level of * remote_apply, because only commit records provide apply feedback. + * + * This function return true if the wait is cancelelled due to an + * interruption. */ -void +bool SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) { char *new_status = NULL; const char *old_status; int mode; + bool canceled = false; /* * This should be called while holding interrupts during a transaction @@ -174,7 +178,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) */ if (!SyncRepRequested() || !((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined) - return; + return false; /* Cap the level for anything other than commit to remote flush only. */ if (commit) @@ -200,7 +204,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) lsn <= WalSndCtl->lsn[mode]) { LWLockRelease(SyncRepLock); - return; + return false; } /* @@ -270,6 +274,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); whereToSendOutput = DestNone; SyncRepCancelWait(); + canceled = true; break; } @@ -286,6 +291,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) (errmsg("canceling wait for synchronous replication due to user request"), errdetail("The transaction has already committed locally, but might not have been replicated to the standby."))); SyncRepCancelWait(); + canceled = true; break; } @@ -305,6 +311,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) ProcDiePending = true; whereToSendOutput = DestNone; SyncRepCancelWait(); + canceled = true; break; } } @@ -328,6 +335,8 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) set_ps_display(new_status); pfree(new_status); } + + return canceled; } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2d7191d3cd..271fd35884 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -17,6 +17,7 @@ #include "access/clog.h" #include "access/commit_ts.h" #include "access/fdwxact.h" +#include "access/fdwxact_launcher.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -151,6 +152,7 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); size = add_size(size, FdwXactShmemSize()); + size = add_size(size, FdwXactRslvShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -270,6 +272,7 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); FdwXactShmemInit(); + FdwXactRslvShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index dc29a7ea6f..a6d40446ce 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,5 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 FdwXactLock 48 +FdwXactResolverLock 49 +FdwXactResolutionLock 50 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 88566bd9fa..0b9d340c49 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -35,6 +35,7 @@ #include #include +#include "access/fdwxact.h" #include "access/transam.h" #include "access/twophase.h" #include "access/xact.h" @@ -417,6 +418,10 @@ InitProcess(void) MyProc->syncRepState = SYNC_REP_NOT_WAITING; SHMQueueElemInit(&(MyProc->syncRepLinks)); + /* Initialize fields for fdwxact */ + MyProc->fdwXactState = FDWXACT_NOT_WAITING; + SHMQueueElemInit(&(MyProc->fdwXactLinks)); + /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; MyProc->procArrayGroupMemberXid = InvalidTransactionId; @@ -817,6 +822,9 @@ ProcKill(int code, Datum arg) /* Make sure we're out of the sync rep lists */ SyncRepCleanupAtProcExit(); + /* Make sure we're out of the fdwxact lists */ + FdwXactCleanupAtProcExit(); + #ifdef USE_ASSERT_CHECKING { int i; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 411cfadbff..496e2b3a4a 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -36,6 +36,8 @@ #include "rusagestub.h" #endif +#include "access/fdwxact_launcher.h" +#include "access/fdwxact_resolver.h" #include "access/parallel.h" #include "access/printtup.h" #include "access/xact.h" @@ -3054,6 +3056,18 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsFdwXactResolver()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating foreign transaction resolver due to administrator command"))); + else if (IsFdwXactLauncher()) + { + /* + * The foreign transaction launcher can be stopped at any time. + * Use exit status 1 so the background worker is restarted. + */ + proc_exit(1); + } else if (RecoveryConflictPending && RecoveryConflictRetryable) { pgstat_report_recovery_conflict(RecoveryConflictReason); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index b468c5628c..b3960e9a1b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -760,6 +760,10 @@ const char *const config_group_names[] = gettext_noop("Client Connection Defaults / Other Defaults"), /* LOCK_MANAGEMENT */ gettext_noop("Lock Management"), + /* FOREIGN_TRANSACTION */ + gettext_noop("Foreign Transaction"), + /* FOREIGN_TRANSACTION_RESOLVER */ + gettext_noop("Foreign Transaction / Resolver"), /* COMPAT_OPTIONS */ gettext_noop("Version and Platform Compatibility"), /* COMPAT_OPTIONS_PREVIOUS */ @@ -2469,6 +2473,39 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"foreign_transaction_resolver_timeout", PGC_SIGHUP, FOREIGN_TRANSACTION_RESOLVER, + gettext_noop("Sets the maximum time to wait for foreign transaction resolution."), + NULL, + GUC_UNIT_MS + }, + &foreign_xact_resolver_timeout, + 60 * 1000, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"max_foreign_transaction_resolvers", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Maximum number of foreign transaction resolution processes."), + NULL + }, + &max_foreign_xact_resolvers, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"foreign_transaction_resolution_retry_interval", PGC_SIGHUP, FOREIGN_TRANSACTION_RESOLVER, + gettext_noop("Sets the time to wait before retrying to resolve foreign transaction " + "after a failed attempt."), + NULL, + GUC_UNIT_MS + }, + &foreign_xact_resolution_retry_interval, + 5000, 1, INT_MAX, + NULL, NULL, NULL + }, + #ifdef LOCK_DEBUG { {"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 863e8ccc3a..2ed09cb347 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -733,6 +733,18 @@ #max_pred_locks_per_page = 2 # min 0 +#------------------------------------------------------------------------------ +# FOREIGN TRANSACTION +#------------------------------------------------------------------------------ + +#max_foreign_transaction_resolvers = 0 # max number of resolver process + # (change requires restart) +#foreign_transaction_resolver_timeout = 60s # in milliseconds; 0 disables +#foreign_transaction_resolution_retry_interval = 5s # time to wait before + # retrying to resolve + # foreign transactions + # after a failed attempt + #------------------------------------------------------------------------------ # VERSION AND PLATFORM COMPATIBILITY #------------------------------------------------------------------------------ diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h index 6ba1a475fc..1ae70cbed6 100644 --- a/src/include/access/fdwxact.h +++ b/src/include/access/fdwxact.h @@ -16,6 +16,11 @@ #include "storage/shmem.h" #include "storage/s_lock.h" +/* fdwXactState */ +#define FDWXACT_NOT_WAITING 0 +#define FDWXACT_WAITING 1 +#define FDWXACT_WAIT_COMPLETE 2 + /* Flag passed to FDW transaction management APIs */ #define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback * without preparation */ @@ -40,6 +45,13 @@ typedef struct FdwXactData TransactionId local_xid; /* XID of local transaction */ + /* + * A backend process that executed the distributed transaction. The owner + * and a process locking this entry can be different during transaction + * resolution as the resolver takes over the entry. + */ + PGPROC *proc; /* process that executed the distributed tx. */ + /* Information relevant with foreign transaction */ Oid dbid; Oid serverid; @@ -106,12 +118,26 @@ typedef struct FdwXactRslvState /* GUC parameters */ extern int max_prepared_foreign_xacts; +extern int max_foreign_xact_resolvers; +extern int foreign_xact_resolution_retry_interval; +extern int foreign_xact_resolver_timeout; +extern int foreign_twophase_commit; /* Function declarations */ extern Size FdwXactShmemSize(void); extern void FdwXactShmemInit(void); extern void AtEOXact_FdwXact(bool is_commit); extern void PrePrepare_FdwXact(void); +extern void PostPrepare_FdwXact(void); +extern bool CollectFdwXactParticipants(TransactionId xid); +extern void SetFdwXactParticipants(TransactionId xid); +extern void FdwXactCleanupAtProcExit(void); +extern void FdwXactWaitForResolution(TransactionId wait_xid, bool commit); +extern PGPROC *FdwXactGetWaiter(TimestampTz now, TimestampTz *nextResolutionTs_p, + TransactionId *waitXid_p); +extern bool FdwXactWaiterExists(Oid dbid); +extern void FdwXactResolveFdwXacts(int *fdwxact_idxs, int nfdwxacts, PGPROC *waiter); +extern void ForgetAllFdwXactParticipants(void); extern bool FdwXactExists(Oid dbid, Oid serverid, Oid userid); extern void CheckPointFdwXacts(XLogRecPtr redo_horizon); extern void RecreateFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, diff --git a/src/include/access/fdwxact_launcher.h b/src/include/access/fdwxact_launcher.h new file mode 100644 index 0000000000..688b43b8d0 --- /dev/null +++ b/src/include/access/fdwxact_launcher.h @@ -0,0 +1,28 @@ +/*------------------------------------------------------------------------- + * + * fdwxact_launcher.h + * PostgreSQL foreign transaction launcher definitions + * + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/include/access/fdwxact_launcher.h + * + *------------------------------------------------------------------------- + */ + +#ifndef FDWXACT_LAUNCHER_H +#define FDWXACT_LAUNCHER_H + +#include "access/fdwxact.h" + +extern void FdwXactLauncherRegister(void); +extern void FdwXactLauncherMain(Datum main_arg); +extern void FdwXactLauncherRequestToLaunch(void); +extern void FdwXactLaunchOrWakeupResolver(void); +extern Size FdwXactRslvShmemSize(void); +extern void FdwXactRslvShmemInit(void); +extern bool IsFdwXactLauncher(void); + + +#endif /* FDWXACT_LAUNCHER_H */ diff --git a/src/include/access/fdwxact_resolver.h b/src/include/access/fdwxact_resolver.h new file mode 100644 index 0000000000..779848113c --- /dev/null +++ b/src/include/access/fdwxact_resolver.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * fdwxact_resolver.h + * PostgreSQL foreign transaction resolver definitions + * + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/include/access/fdwxact_resolver.h + * + *------------------------------------------------------------------------- + */ +#ifndef FDWXACT_RESOLVER_H +#define FDWXACT_RESOLVER_H + +#include "access/fdwxact.h" + +extern void FdwXactResolverMain(Datum main_arg); +extern bool IsFdwXactResolver(void); + +extern int foreign_xact_resolver_timeout; + +#endif /* FDWXACT_RESOLVER_H */ diff --git a/src/include/access/resolver_internal.h b/src/include/access/resolver_internal.h new file mode 100644 index 0000000000..c935471936 --- /dev/null +++ b/src/include/access/resolver_internal.h @@ -0,0 +1,63 @@ +/*------------------------------------------------------------------------- + * + * resolver_internal.h + * Internal headers shared by fdwxact resolvers. + * + * Portions Copyright (c) 2020, PostgreSQL Global Development Group + * + * src/include/access/resolver_internal.h + * + *------------------------------------------------------------------------- + */ + +#ifndef RESOLVER_INTERNAL_H +#define RESOLVER_INTERNAL_H + +#include "storage/latch.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/timestamp.h" + +/* + * Each foreign transaction resolver has a FdwXactResolver struct in + * shared memory. This struct is protected by FdwXactResolverLaunchLock. + */ +typedef struct FdwXactResolver +{ + pid_t pid; /* this resolver's PID, or 0 if not active */ + Oid dbid; /* database oid */ + + /* Indicates if this slot is used of free */ + bool in_use; + + /* Protect shared variables shown above */ + slock_t mutex; + + /* + * Pointer to the resolver's patch. Used by backends to wake up this + * resolver when it has work to do. NULL if the resolver isn't active. + */ + Latch *latch; +} FdwXactResolver; + +/* There is one FdwXactRslvCtlData struct for the whole database cluster */ +typedef struct FdwXactRslvCtlData +{ + /* Foreign transaction resolution queue. Protected by FdwXactLock */ + SHM_QUEUE fdwxact_queue; + + /* Supervisor process and latch */ + pid_t launcher_pid; + Latch *launcher_latch; + + FdwXactResolver resolvers[FLEXIBLE_ARRAY_MEMBER]; +} FdwXactRslvCtlData; +#define SizeOfFdwXactRslvCtlData \ + (offsetof(FdwXactRslvCtlData, resolvers) + sizeof(FdwXactResolver)) + +extern FdwXactRslvCtlData *FdwXactRslvCtl; + +extern FdwXactResolver *MyFdwXactResolver; +extern FdwXactRslvCtlData *FdwXactRslvCtl; + +#endif /* RESOLVER_INTERNAL_H */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 52f71ccd17..37d12adda2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -6136,6 +6136,11 @@ proname => 'pg_walfile_name', prorettype => 'text', proargtypes => 'pg_lsn', prosrc => 'pg_walfile_name' }, +{ oid => '9709', + descr => 'stop a foreign transaction resolver process running on the given database', + proname => 'pg_stop_foreign_xact_resolver', provolatile => 'v', prorettype => 'bool', + proargtypes => 'oid', prosrc => 'pg_stop_foreign_xact_resolver'}, + { oid => '3165', descr => 'difference in bytes, given two wal locations', proname => 'pg_wal_lsn_diff', prorettype => 'numeric', proargtypes => 'pg_lsn pg_lsn', prosrc => 'pg_wal_lsn_diff' }, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index d43dcce56f..7f3ed0bc71 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -806,6 +806,8 @@ typedef enum WAIT_EVENT_BGWRITER_HIBERNATE, WAIT_EVENT_BGWRITER_MAIN, WAIT_EVENT_CHECKPOINTER_MAIN, + WAIT_EVENT_FDWXACT_RESOLVER_MAIN, + WAIT_EVENT_FDWXACT_LAUNCHER_MAIN, WAIT_EVENT_LOGICAL_APPLY_MAIN, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN, WAIT_EVENT_PGSTAT_MAIN, @@ -853,6 +855,7 @@ typedef enum WAIT_EVENT_CHECKPOINT_DONE, WAIT_EVENT_CHECKPOINT_START, WAIT_EVENT_EXECUTE_GATHER, + WAIT_EVENT_FDWXACT_RESOLUTION, WAIT_EVENT_HASH_BATCH_ALLOCATE, WAIT_EVENT_HASH_BATCH_ELECT, WAIT_EVENT_HASH_BATCH_LOAD, diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h index 9d286b66c6..cffab9c721 100644 --- a/src/include/replication/syncrep.h +++ b/src/include/replication/syncrep.h @@ -82,7 +82,7 @@ extern char *syncrep_parse_error_msg; extern char *SyncRepStandbyNames; /* called by user backend */ -extern void SyncRepWaitForLSN(XLogRecPtr lsn, bool commit); +extern bool SyncRepWaitForLSN(XLogRecPtr lsn, bool commit); /* called at backend exit */ extern void SyncRepCleanupAtProcExit(void); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 9c9a50ae45..06c9f4095f 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -16,6 +16,7 @@ #include "access/clog.h" #include "access/xlogdefs.h" +#include "datatype/timestamp.h" #include "lib/ilist.h" #include "storage/latch.h" #include "storage/lock.h" @@ -188,6 +189,17 @@ struct PGPROC int syncRepState; /* wait state for sync rep */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + /* + * Info to allow us to wait for foreign transaction to be resolved, if + * needed. + */ + TransactionId fdwXactWaitXid; /* waiting for foreign transaction involved with + * this transaction id to be resolved */ + int fdwXactState; /* wait state for foreign transaction + * resolution */ + SHM_QUEUE fdwXactLinks; /* list link if process is in queue */ + TimestampTz fdwXactNextResolutionTs; + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 04431d0eb2..a00ca73355 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -96,6 +96,8 @@ enum config_group CLIENT_CONN_PRELOAD, CLIENT_CONN_OTHER, LOCK_MANAGEMENT, + FOREIGN_TRANSACTION, + FOREIGN_TRANSACTION_RESOLVER, COMPAT_OPTIONS, COMPAT_OPTIONS_PREVIOUS, COMPAT_OPTIONS_CLIENT, -- 2.23.0