Transactions involving multiple postgres foreign servers, take 2 - Mailing list pgsql-hackers
From | Kyotaro Horiguchi |
---|---|
Subject | Transactions involving multiple postgres foreign servers, take 2 |
Date | |
Msg-id | 20191206.173215.1818665441859410805.horikyota.ntt@gmail.com Whole thread Raw |
In response to | Re: [HACKERS] Transactions involving multiple postgres foreignservers, take 2 (Michael Paquier <michael@paquier.xyz>) |
Responses |
Re: Transactions involving multiple postgres foreign servers, take 2
|
List | pgsql-hackers |
Hello. This is the reased (and a bit fixed) version of the patch. This applies on the master HEAD and passes all provided tests. I took over this work from Sawada-san. I'll begin with reviewing the current patch. regards. -- Kyotaro Horiguchi NTT Open Source Software Center From 733f1e413ef2b2fe1d3ecba41eb4cd8e355ab826 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Date: Thu, 5 Dec 2019 16:59:47 +0900 Subject: [PATCH v26 1/5] Keep track of writing on non-temporary relation Original Author: Masahiko Sawada <sawada.mshk@gmail.com> --- src/backend/executor/nodeModifyTable.c | 12 ++++++++++++ src/include/access/xact.h | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index e3eb9d7b90..cd91f9c8a8 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -587,6 +587,10 @@ ExecInsert(ModifyTableState *mtstate, estate->es_output_cid, 0, NULL); + /* Make note that we've wrote on non-temprary relation */ + if (RelationNeedsWAL(resultRelationDesc)) + MyXactFlags |= XACT_FLAGS_WROTENONTEMPREL; + /* insert index entries for tuple */ if (resultRelInfo->ri_NumIndices > 0) recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, @@ -938,6 +942,10 @@ ldelete:; if (tupleDeleted) *tupleDeleted = true; + /* Make note that we've wrote on non-temprary relation */ + if (RelationNeedsWAL(resultRelationDesc)) + MyXactFlags |= XACT_FLAGS_WROTENONTEMPREL; + /* * If this delete is the result of a partition key update that moved the * tuple to a new partition, put this row into the transition OLD TABLE, @@ -1447,6 +1455,10 @@ lreplace:; recheckIndexes = ExecInsertIndexTuples(slot, estate, false, NULL, NIL); } + /* Make note that we've wrote on non-temprary relation */ + if (RelationNeedsWAL(resultRelationDesc)) + MyXactFlags |= XACT_FLAGS_WROTENONTEMPREL; + if (canSetTag) (estate->es_processed)++; diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 9d2899dea1..cb5c4935d2 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -102,6 +102,12 @@ extern int MyXactFlags; */ #define XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK (1U << 1) +/* + * XACT_FLAGS_WROTENONTEMPREL - set when we wrote data on non-temporary + * relation. + */ +#define XACT_FLAGS_WROTENONTEMPREL (1U << 2) + /* * start- and end-of-transaction callbacks for dynamically loaded modules */ -- 2.23.0 From d21c72a7db85c2211504f60fca8d39c0bd0ee5a6 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Date: Thu, 5 Dec 2019 17:00:50 +0900 Subject: [PATCH v26 2/5] Support atomic commit among multiple foreign servers. Original Author: Masahiko Sawada <sawada.mshk@gmail.com> --- src/backend/access/Makefile | 2 +- src/backend/access/fdwxact/Makefile | 17 + src/backend/access/fdwxact/README | 130 + src/backend/access/fdwxact/fdwxact.c | 2816 +++++++++++++++++ src/backend/access/fdwxact/launcher.c | 644 ++++ src/backend/access/fdwxact/resolver.c | 344 ++ src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/fdwxactdesc.c | 58 + src/backend/access/rmgrdesc/xlogdesc.c | 6 +- src/backend/access/transam/rmgr.c | 1 + src/backend/access/transam/twophase.c | 42 + src/backend/access/transam/xact.c | 27 +- src/backend/access/transam/xlog.c | 34 +- src/backend/catalog/system_views.sql | 11 + src/backend/commands/copy.c | 6 + src/backend/commands/foreigncmds.c | 30 + src/backend/executor/execPartition.c | 8 + src/backend/executor/nodeForeignscan.c | 24 + src/backend/executor/nodeModifyTable.c | 18 + src/backend/foreign/foreign.c | 57 + src/backend/postmaster/bgworker.c | 8 + src/backend/postmaster/pgstat.c | 20 + src/backend/postmaster/postmaster.c | 15 +- src/backend/replication/logical/decode.c | 1 + src/backend/storage/ipc/ipci.c | 6 + src/backend/storage/ipc/procarray.c | 46 + src/backend/storage/lmgr/lwlocknames.txt | 3 + src/backend/storage/lmgr/proc.c | 8 + src/backend/tcop/postgres.c | 14 + src/backend/utils/misc/guc.c | 82 + src/backend/utils/misc/postgresql.conf.sample | 16 + src/backend/utils/probes.d | 2 + src/bin/initdb/initdb.c | 1 + src/bin/pg_controldata/pg_controldata.c | 2 + src/bin/pg_resetwal/pg_resetwal.c | 2 + src/bin/pg_waldump/fdwxactdesc.c | 1 + src/bin/pg_waldump/rmgrdesc.c | 1 + src/include/access/fdwxact.h | 165 + src/include/access/fdwxact_launcher.h | 29 + src/include/access/fdwxact_resolver.h | 23 + src/include/access/fdwxact_xlog.h | 54 + src/include/access/resolver_internal.h | 66 + src/include/access/rmgrlist.h | 1 + src/include/access/twophase.h | 1 + src/include/access/xact.h | 7 + src/include/access/xlog_internal.h | 1 + src/include/catalog/pg_control.h | 1 + src/include/catalog/pg_proc.dat | 29 + src/include/foreign/fdwapi.h | 12 + src/include/foreign/foreign.h | 1 + src/include/pgstat.h | 9 +- src/include/storage/proc.h | 11 + src/include/storage/procarray.h | 5 + src/include/utils/guc_tables.h | 3 + src/test/regress/expected/rules.out | 13 + 55 files changed, 4917 insertions(+), 18 deletions(-) create mode 100644 src/backend/access/fdwxact/Makefile create mode 100644 src/backend/access/fdwxact/README create mode 100644 src/backend/access/fdwxact/fdwxact.c create mode 100644 src/backend/access/fdwxact/launcher.c create mode 100644 src/backend/access/fdwxact/resolver.c create mode 100644 src/backend/access/rmgrdesc/fdwxactdesc.c create mode 120000 src/bin/pg_waldump/fdwxactdesc.c create mode 100644 src/include/access/fdwxact.h create mode 100644 src/include/access/fdwxact_launcher.h create mode 100644 src/include/access/fdwxact_resolver.h create mode 100644 src/include/access/fdwxact_xlog.h create mode 100644 src/include/access/resolver_internal.h diff --git a/src/backend/access/Makefile b/src/backend/access/Makefile index 0880e0a8bb..49480dd039 100644 --- a/src/backend/access/Makefile +++ b/src/backend/access/Makefile @@ -9,6 +9,6 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global SUBDIRS = brin common gin gist hash heap index nbtree rmgrdesc spgist \ - table tablesample transam + table tablesample transam fdwxact include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/fdwxact/Makefile b/src/backend/access/fdwxact/Makefile new file mode 100644 index 0000000000..0207a66fb4 --- /dev/null +++ b/src/backend/access/fdwxact/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for access/fdwxact +# +# IDENTIFICATION +# src/backend/access/fdwxact/Makefile +# +#------------------------------------------------------------------------- + +subdir = src/backend/access/fdwxact +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = fdwxact.o resolver.o launcher.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/access/fdwxact/README b/src/backend/access/fdwxact/README new file mode 100644 index 0000000000..46ccb7eeae --- /dev/null +++ b/src/backend/access/fdwxact/README @@ -0,0 +1,130 @@ +src/backend/access/fdwxact/README + +Atomic Commit for Distributed Transactions +=========================================== + +The atomic commit feature enables us to commit and rollback either all of +foreign servers or nothing. This ensures that the database data is always left +in a conssitent state in term of federated database. + + +Commit Sequence of Global Transactions +-------------------------------- + +We employee two-phase commit protocol to achieve commit among all foreign +servers atomically. The sequence of distributed transaction commit consisnts +of the following four steps: + +1. Foriegn Server Registration +During executor node initialization, accessed foreign servers are registered +to the list FdwXactAtomicCommitParticipants, which is maintained by +PostgreSQL's the global transaction manager (GTM), as a distributed transaction +participant The registered foreign transactions are tracked until the end of +transaction. + +2. Pre-Commit phase (1st phase of two-phase commit) +we record the corresponding WAL indicating that the foreign server is involved +with the current transaction before doing PREPARE all foreign transactions. +Thus in case we loose connectivity to the foreign server or crash ourselves, +we will remember that we might have prepared tranascation on the foreign +server, and try to resolve it when connectivity is restored or after crash +recovery. + +The two-phase commit is required only if the transaction modified two or more +servers including the local node. In other case, we can commit them at this +step by calling CommitForeignTransaction() API and no need further operation. + +After that we prepare all foreign transactions by calling +PrepareForeignTransaction() API. If we failed on any of them we change to +rollback, therefore at this time some participants might be prepared whereas +some are not prepared. The former foreign transactions need to be resolved +using pg_resolve_foreign_xact() manually and the latter ends transaction +in one-phase by calling RollbackForeignTransaction() API. + +3. Commit locally +Once we've prepared all of them, commit the transaction locally. + +4. Post-Commit Phase (2nd phase of two-phase commit) +The steps so far are done by the backend process committing the transaction but +this resolution step(commit or rollback) is done by the foreign transaction +resolver process. The backend process inserts itselft to the wait queue, and +then wake up the resolver process (or request to launch new one if necessary). +The resolver process enqueue the waiter and fetch the distributed transaction +information that the backend is waiting for. Once all foreign transaction are +committed or rolbacked the resolver process wake up the waiter. + + +API Contract With Transaction Management Callback Functions +----------------------------------------------------------- + +The core GTM manages the status of individual foreign transactions and calls +transaction management callback functions according to its status. Each +callback functions PrepareForiegnTransaction, CommitForeignTransaction and +RollbackForeignTransaction is responsible for either PREPARE, COMMIT or +ROLLBACK the trasaction on the foreign server respectively. +FdwXactRslvState->flags could contain FDWXACT_FLAG_ONEPHASE, meaning FDW can +commit or rollback the foreign transactio in one-phase. On failure during +processing a foreign transaction, FDW needs to raise an error. However, FDW +must accept ERRCODE_UNDEFINED_OBJECT error during committing or rolling back a +foreign transaction, because there is a race condition that the coordinator +could crash in time between the resolution is completed and writing the WAL +removing the FdwXact entry. + + +Foreign Transactions Status +---------------------------- + +Every foreign transactions has an FdwXact entry. When preparing a foreign +transaction a FdwXact entry of which status starts from FDWXACT_STATUS_INITIAL +are created with WAL logging. The status changes to FDWXACT_STATUS_PREPARED +after the foreign transaction is prepared and it changes to +FDWXACT_STATUS_PREPARING, FDWXACT_STATUS_COMMITTING and FDWXACT_STATUS_ABORTING +before the foreign transaction is prepared, committed and aborted by FDW +callback functions respectively(*1). And the status then changes to +FDWXACT_STATUS_RESOLVED once the foreign transaction are resolved, and then +the corresponding FdwXact entry is removed with WAL logging. If failed during +processing foreign transaction (i.g. preparing, committing or aborting) the +status changes back to the previous status. Therefore the status +FDWXACT_STATUS_xxxING appear only during the foreign transaction is being +processed by an FDW callback function. + +FdwXact entries recovered during the recovery are marked as in-doubt if the +corresponding local transaction is not prepared transaction. The initial +status is FDWXACT_STATUS_PREPARED(*2). Because the foreign transaction was +being processed we cannot know the exact status. So we regard it as PREPARED +for safety. + +The foreign transaction status transition is illustrated by the following graph +describing the FdwXact->status: + + +----------------------------------------------------+ + | INVALID | + +----------------------------------------------------+ + | | | + | v | + | +---------------------+ | + | | INITIAL | | + | +---------------------+ | + (*2) | (*2) + | v | + | +---------------------+ | + | | PREPARING(*1) | | + | +---------------------+ | + | | | + v v v + +----------------------------------------------------+ + | PREPARED | + +----------------------------------------------------+ + | | + v v + +--------------------+ +--------------------+ + | COMMITTING(*1) | | ABORTING(*1) | + +--------------------+ +--------------------+ + | | + v v + +----------------------------------------------------+ + | RESOLVED | + +----------------------------------------------------+ + +(*1) Status that appear only during being processed by FDW +(*2) Paths for recovered FdwXact entries diff --git a/src/backend/access/fdwxact/fdwxact.c b/src/backend/access/fdwxact/fdwxact.c new file mode 100644 index 0000000000..058a416f81 --- /dev/null +++ b/src/backend/access/fdwxact/fdwxact.c @@ -0,0 +1,2816 @@ +/*------------------------------------------------------------------------- + * + * fdwxact.c + * PostgreSQL global transaction manager for foreign servers. + * + * To achieve commit among all foreign servers automically, we employee + * two-phase commit protocol, which is a type of atomic commitment + * protocol(ACP). The basic strategy is that we prepare all of the remote + * transactions before committing locally and commit them after committing + * locally. + * + * During executor node initialization, they can register the foreign server + * by calling either RegisterFdwXactByRelId() or RegisterFdwXactByServerId() + * to participate it to a group for global commit. The foreign servers are + * registered if FDW has both CommitForeignTransaciton API and + * RollbackForeignTransactionAPI. Registered participant servers are identified + * by OIDs of foreign server and user. + * + * During pre-commit of local transaction, we prepare the transaction on + * foreign server everywhere. And after committing or rolling back locally, + * we notify the resolver process and tell it to commit or rollback those + * transactions. If we ask it to commit, we also tell it to notify us when + * it's done, so that we can wait interruptibly for it to finish, and so + * that we're not trying to locally do work that might fail after foreign + * transaction are committed. + * + * The best performing way to manage the waiting backends is to have a + * queue of waiting backends, so that we can avoid searching the through all + * foreign transactions each time we receive a request. We have one queue + * of which elements are ordered by the timestamp that they expect to be + * processed at. Before waiting for foreign transactions being resolved the + * backend enqueues with the timestamp that they expects to be processed. + * Similary if failed to resolve them, it enqueues again with new timestamp + * (its timestamp + foreign_xact_resolution_interval). + * + * If any network failure, server crash occurs or user stopped waiting + * prepared foreign transactions are left in in-doubt state (aka. in-doubt + * transaction). Foreign transactions in in-doubt state are not resolved + * automatically so must be processed manually using by pg_resovle_fdwxact() + * function. + * + * Two-phase commit protocol is required if the transaction modified two or + * more servers including itself. In other case, all foreign transactions are + * committed or rolled back during pre-commit. + * + * LOCKING + * + * Whenever a foreign transaction is processed by FDW, the corresponding + * FdwXact entry is update. In order to protect the entry from concurrent + * removing we need to hold a lock on the entry or a lock for entire global + * array. However, we don't want to hold the lock during FDW is processing the + * foreign transaction that may take a unpredictable time. To avoid this, the + * in-memory data of foreign transaction follows a locking model based on + * four linked concepts: + * + * * A foreign transaction's status variable is switched using the LWLock + * FdwXactLock, which need to be hold in exclusive mode when updating the + * status, while readers need to hold it in shared mode when looking at the + * status. + * * A process who is going to update FdwXact entry cannot process foreign + * transaction that is being resolved. + * * So setting the status to FDWACT_STATUS_PREPARING, + * FDWXACT_STATUS_COMMITTING or FDWXACT_STATUS_ABORTING, which makes foreign + * transaction in-progress states, means to own the FdwXact entry, which + * protect it from updating/removing by concurrent writers. + * * Individual fields are protected by mutex where only the backend owning + * the foreign transaction is authorized to update the fields from its own + * one. + + * Therefore, before doing PREPARE, COMMIT PREPARED or ROLLBACK PREPARED a + * process who is going to call transaction callback functions needs to change + * the status to the corresponding status above while holding FdwXactLock in + * exclusive mode, and call callback function after releasing the lock. + * + * RECOVERY + * + * During replay WAL and replication FdwXactCtl also holds information about + * active prepared foreign transaction that haven't been moved to disk yet. + * + * Replay of fdwxact records happens by the following rules: + * + * * At the beginning of recovery, pg_fdwxacts is scanned once, filling FdwXact + * with entries marked with fdwxact->inredo and fdwxact->ondisk. FdwXact file + * data older than the XID horizon of the redo position are discarded. + * * On PREPARE redo, the foreign transaction is added to FdwXactCtl->fdwxacts. + * We set fdwxact->inredo to true for such entries. + * * On Checkpoint we iterate through FdwXactCtl->fdwxacts entries that + * have fdwxact->inredo set and are behind the redo_horizon. We save + * them to disk and then set fdwxact->ondisk to true. + * * On resolution we delete the entry from FdwXactCtl->fdwxacts. If + * fdwxact->ondisk is true, the corresponding entry from the disk is + * additionally deleted. + * * RecoverFdwXacts() and PrescanFdwXacts() have been modified to go through + * fdwxact->inredo entries that have not made it to dink. + * + * These replay rules are borrowed from twophase.c + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/fdwxact/fdwxact.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> + +#include "access/fdwxact.h" +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" +#include "access/fdwxact_xlog.h" +#include "access/resolver_internal.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/twophase.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogutils.h" +#include "catalog/pg_type.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" +#include "funcapi.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "parser/parsetree.h" +#include "pg_trace.h" +#include "pgstat.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lock.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/pmsignal.h" +#include "storage/shmem.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" + +/* Atomic commit is enabled by configuration */ +#define IsForeignTwophaseCommitEnabled() \ + (max_prepared_foreign_xacts > 0 && \ + max_foreign_xact_resolvers > 0) + +/* Foreign twophase commit is enabled and requested by user */ +#define IsForeignTwophaseCommitRequested() \ + (IsForeignTwophaseCommitEnabled() && \ + (foreign_twophase_commit > FOREIGN_TWOPHASE_COMMIT_DISABLED)) + +/* Check the FdwXactParticipant is capable of two-phase commit */ +#define IsSeverCapableOfTwophaseCommit(fdw_part) \ + (((FdwXactParticipant *)(fdw_part))->prepare_foreign_xact_fn != NULL) + +/* Check the FdwXact is begin resolved */ +#define FdwXactIsBeingResolved(fx) \ + (((((FdwXact)(fx))->status) == FDWXACT_STATUS_PREPARING) || \ + ((((FdwXact)(fx))->status) == FDWXACT_STATUS_COMMITTING) || \ + ((((FdwXact)(fx))->status) == FDWXACT_STATUS_ABORTING)) + +/* + * Structure to bundle the foreign transaction participant. This struct + * is created at the beginning of execution for each foreign servers and + * is used until the end of transaction where we cannot look at syscaches. + * Therefore, this is allocated in the TopTransactionContext. + */ +typedef struct FdwXactParticipant +{ + /* + * Pointer to a FdwXact entry in the global array. NULL if the entry + * is not inserted yet but this is registered as a participant. + */ + FdwXact fdwxact; + + /* Foreign server and user mapping info, passed to callback routines */ + ForeignServer *server; + UserMapping *usermapping; + + /* Transaction identifier used for PREPARE */ + char *fdwxact_id; + + /* true if modified the data on the server */ + bool modified; + + /* Callbacks for foreign transaction */ + PrepareForeignTransaction_function prepare_foreign_xact_fn; + CommitForeignTransaction_function commit_foreign_xact_fn; + RollbackForeignTransaction_function rollback_foreign_xact_fn; + GetPrepareId_function get_prepareid_fn; +} FdwXactParticipant; + +/* + * List of foreign transaction participants for atomic commit. This list + * has only foreign servers that provides transaction management callbacks, + * that is CommitForeignTransaction and RollbackForeignTransaction. + */ +static List *FdwXactParticipants = NIL; +static bool ForeignTwophaseCommitIsRequired = false; + +/* Directory where the foreign prepared transaction files will reside */ +#define FDWXACTS_DIR "pg_fdwxact" + +/* + * Name of foreign prepared transaction file is 8 bytes database oid, + * xid, foreign server oid and user oid separated by '_'. + * + * Since FdwXact stat file is created per foreign transaction in a + * distributed transaction and the xid of unresolved distributed + * transaction never reused, the name is fairly enough to ensure + * uniqueness. + */ +#define FDWXACT_FILE_NAME_LEN (8 + 1 + 8 + 1 + 8 + 1 + 8) +#define FdwXactFilePath(path, dbid, xid, serverid, userid) \ + snprintf(path, MAXPGPATH, FDWXACTS_DIR "/%08X_%08X_%08X_%08X", \ + dbid, xid, serverid, userid) + +/* Guc parameters */ +int max_prepared_foreign_xacts = 0; +int max_foreign_xact_resolvers = 0; +int foreign_twophase_commit = FOREIGN_TWOPHASE_COMMIT_DISABLED; + +/* Keep track of registering process exit call back. */ +static bool fdwXactExitRegistered = false; + +static FdwXact FdwXactInsertFdwXactEntry(TransactionId xid, + FdwXactParticipant *fdw_part); +static void FdwXactPrepareForeignTransactions(void); +static void FdwXactOnePhaseEndForeignTransaction(FdwXactParticipant *fdw_part, + bool for_commit); +static void FdwXactResolveForeignTransaction(FdwXact fdwxact, + FdwXactRslvState *state, + FdwXactStatus fallback_status); +static void FdwXactComputeRequiredXmin(void); +static void FdwXactCancelWait(void); +static void FdwXactRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn); +static void FdwXactRedoRemove(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, bool give_warnings); +static void FdwXactQueueInsert(PGPROC *waiter); +static void AtProcExit_FdwXact(int code, Datum arg); +static void ForgetAllFdwXactParticipants(void); +static char *ReadFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, + Oid userid); +static void RemoveFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, bool giveWarning); +static void RecreateFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, void *content, int len); +static void XlogReadFdwXactData(XLogRecPtr lsn, char **buf, int *len); +static char *ProcessFdwXactBuffer(Oid dbid, TransactionId local_xid, + Oid serverid, Oid userid, + XLogRecPtr insert_start_lsn, + bool from_disk); +static void FdwXactDetermineTransactionFate(FdwXact fdwxact, bool need_lock); +static bool is_foreign_twophase_commit_required(void); +static void register_fdwxact(Oid serverid, Oid userid, bool modified); +static List *get_fdwxacts(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + bool including_indoubts, bool include_in_progress, + bool need_lock); +static FdwXact get_all_fdwxacts(int *num_p); +static FdwXact insert_fdwxact(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, Oid umid, char *fdwxact_id); +static char *get_fdwxact_identifier(FdwXactParticipant *fdw_part, + TransactionId xid); +static void remove_fdwxact(FdwXact fdwxact); +static FdwXact get_fdwxact_to_resolve(Oid dbid, TransactionId xid); +static FdwXactRslvState *create_fdwxact_state(void); + +#ifdef USE_ASSERT_CHECKING +static bool FdwXactQueueIsOrderedByTimestamp(void); +#endif + +/* + * Remember accessed foreign transaction. Both RegisterFdwXactByRelId and + * RegisterFdwXactByServerId are called by executor during initialization. + */ +void +RegisterFdwXactByRelId(Oid relid, bool modified) +{ + Relation rel; + Oid serverid; + Oid userid; + + rel = relation_open(relid, NoLock); + serverid = GetForeignServerIdByRelId(relid); + userid = rel->rd_rel->relowner ? rel->rd_rel->relowner : GetUserId(); + relation_close(rel, NoLock); + + register_fdwxact(serverid, userid, modified); +} + +void +RegisterFdwXactByServerId(Oid serverid, bool modified) +{ + register_fdwxact(serverid, GetUserId(), modified); +} + +/* + * Register given foreign transaction identified by given arguments as + * a participant of the transaction. + * + * The foreign transaction identified by given server id and user id. + * Registered foreign transactions are managed by the global transaction + * manager until the end of the transaction. + */ +static void +register_fdwxact(Oid serverid, Oid userid, bool modified) +{ + FdwXactParticipant *fdw_part; + ForeignServer *foreign_server; + UserMapping *user_mapping; + MemoryContext old_ctx; + FdwRoutine *routine; + ListCell *lc; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (fdw_part->server->serverid == serverid && + fdw_part->usermapping->userid == userid) + { + /* The foreign server is already registered, return */ + fdw_part->modified |= modified; + return; + } + } + + /* + * Participant's information is also needed at the end of a transaction, + * where system cache are not available. Save it in TopTransactionContext + * so that these can live until the end of transaction. + */ + old_ctx = MemoryContextSwitchTo(TopTransactionContext); + routine = GetFdwRoutineByServerId(serverid); + + /* + * Don't register foreign server if it doesn't provide both commit and + * rollback transaction management callbacks. + */ + if (!routine->CommitForeignTransaction || + !routine->RollbackForeignTransaction) + { + MyXactFlags |= XACT_FLAGS_FDWNOPREPARE; + pfree(routine); + return; + } + + /* + * Remember we touched the foreign server that is not capable of two-phase + * commit. + */ + if (!routine->PrepareForeignTransaction) + MyXactFlags |= XACT_FLAGS_FDWNOPREPARE; + + foreign_server = GetForeignServer(serverid); + user_mapping = GetUserMapping(userid, serverid); + + + fdw_part = (FdwXactParticipant *) palloc(sizeof(FdwXactParticipant)); + + fdw_part->fdwxact_id = NULL; + fdw_part->server = foreign_server; + fdw_part->usermapping = user_mapping; + fdw_part->fdwxact = NULL; + fdw_part->modified = modified; + fdw_part->prepare_foreign_xact_fn = routine->PrepareForeignTransaction; + fdw_part->commit_foreign_xact_fn = routine->CommitForeignTransaction; + fdw_part->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; + fdw_part->get_prepareid_fn = routine->GetPrepareId; + + /* Add to the participants list */ + FdwXactParticipants = lappend(FdwXactParticipants, fdw_part); + + /* Revert back the context */ + MemoryContextSwitchTo(old_ctx); +} + +/* + * Calculates the size of shared memory allocated for maintaining foreign + * prepared transaction entries. + */ +Size +FdwXactShmemSize(void) +{ + Size size; + + /* Size for foreign transaction information array */ + size = offsetof(FdwXactCtlData, fdwxacts); + size = add_size(size, mul_size(max_prepared_foreign_xacts, + sizeof(FdwXact))); + size = MAXALIGN(size); + size = add_size(size, mul_size(max_prepared_foreign_xacts, + sizeof(FdwXactData))); + + return size; +} + +/* + * Initialization of shared memory for maintaining foreign prepared transaction + * entries. The shared memory layout is defined in definition of FdwXactCtlData + * structure. + */ +void +FdwXactShmemInit(void) +{ + bool found; + + if (!fdwXactExitRegistered) + { + before_shmem_exit(AtProcExit_FdwXact, 0); + fdwXactExitRegistered = true; + } + + FdwXactCtl = ShmemInitStruct("Foreign transactions table", + FdwXactShmemSize(), + &found); + if (!IsUnderPostmaster) + { + FdwXact fdwxacts; + int cnt; + + Assert(!found); + FdwXactCtl->free_fdwxacts = NULL; + FdwXactCtl->num_fdwxacts = 0; + + /* Initialize the linked list of free FDW transactions */ + fdwxacts = (FdwXact) + ((char *) FdwXactCtl + + MAXALIGN(offsetof(FdwXactCtlData, fdwxacts) + + sizeof(FdwXact) * max_prepared_foreign_xacts)); + for (cnt = 0; cnt < max_prepared_foreign_xacts; cnt++) + { + fdwxacts[cnt].status = FDWXACT_STATUS_INVALID; + fdwxacts[cnt].fdwxact_free_next = FdwXactCtl->free_fdwxacts; + FdwXactCtl->free_fdwxacts = &fdwxacts[cnt]; + SpinLockInit(&(fdwxacts[cnt].mutex)); + } + } + else + { + Assert(FdwXactCtl); + Assert(found); + } +} + +/* + * Prepare all foreign transactions if foreign twophase commit is required. + * If foreign twophase commit is required, the behavior depends on the value + * of foreign_twophase_commit; when 'required' we strictly require for all + * foreign server's FDWs to support two-phase commit protocol and ask them to + * prepare foreign transactions, when 'prefer' we ask only foreign servers + * that are capable of two-phase commit to prepare foreign transactions and ask + * for other servers to commit, and for 'disabled' we ask all foreign servers + * to commit foreign transaction in one-phase. If we failed to commit any of + * them we change to aborting. + * + * Note that non-modified foreign servers always can be committed without + * preparation. + */ +void +PreCommit_FdwXacts(void) +{ + bool need_twophase_commit; + ListCell *lc = NULL; + + /* If there are no foreign servers involved, we have no business here */ + if (FdwXactParticipants == NIL) + return; + + /* + * we require all modified server have to be capable of two-phase + * commit protocol. + */ + if (foreign_twophase_commit == FOREIGN_TWOPHASE_COMMIT_REQUIRED && + (MyXactFlags & XACT_FLAGS_FDWNOPREPARE) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot COMMIT a distributed transaction that has operated on foreign server that doesn't supportatomic commit"))); + + /* + * Check if we need to use foreign twophase commit. It's always false + * if foreign twophase commit is disabled. + */ + need_twophase_commit = is_foreign_twophase_commit_required(); + + /* + * Firstly, we consider to commit foreign transactions in one-phase. + */ + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + bool commit = false; + + /* Can commit in one-phase if two-phase commit is not requried */ + if (!need_twophase_commit) + commit = true; + + /* Non-modified foreign transaction always can be committed in one-phase */ + if (!fdw_part->modified) + commit = true; + + /* + * In 'prefer' case, non-twophase-commit capable server can be + * committed in one-phase. + */ + if (foreign_twophase_commit == FOREIGN_TWOPHASE_COMMIT_PREFER && + !IsSeverCapableOfTwophaseCommit(fdw_part)) + commit = true; + + if (commit) + { + /* Commit the foreign transaction in one-phase */ + FdwXactOnePhaseEndForeignTransaction(fdw_part, true); + + /* Delete it from the participant list */ + FdwXactParticipants = foreach_delete_current(FdwXactParticipants, + lc); + continue; + } + } + + /* All done if we committed all foreign transactions */ + if (FdwXactParticipants == NIL) + return; + + /* + * Secondary, if only one transaction is remained in the participant list + * and we didn't modified the local data we can commit it without + * preparation. + */ + if (list_length(FdwXactParticipants) == 1 && + (MyXactFlags & XACT_FLAGS_WROTENONTEMPREL) == 0) + { + /* Commit the foreign transaction in one-phase */ + FdwXactOnePhaseEndForeignTransaction(linitial(FdwXactParticipants), + true); + + /* All foreign transaction must be committed */ + list_free(FdwXactParticipants); + return; + } + + /* + * Finally, prepare foreign transactions. Note that we keep + * FdwXactParticipants until the end of transaction. + */ + FdwXactPrepareForeignTransactions(); +} + +/* + * Insert FdwXact entries and prepare foreign transactions. Before inserting + * FdwXact entry we call get_preparedid callback to get a transaction + * identifier from FDW. + * + * We still can change to rollback here. If any error occurs, we rollback + * non-prepared foreign trasactions and leave others to the resolver. + */ +static void +FdwXactPrepareForeignTransactions(void) +{ + ListCell *lcell; + TransactionId xid; + + if (FdwXactParticipants == NIL) + return; + + /* Parameter check */ + if (max_prepared_foreign_xacts == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepread foreign transactions are disabled"), + errhint("Set max_prepared_foreign_transactions to a nonzero value."))); + + if (max_foreign_xact_resolvers == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("prepread foreign transactions are disabled"), + errhint("Set max_foreign_transaction_resolvers to a nonzero value."))); + + xid = GetTopTransactionId(); + + /* Loop over the foreign connections */ + foreach(lcell, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lcell); + FdwXactRslvState *state; + FdwXact fdwxact; + + fdw_part->fdwxact_id = get_fdwxact_identifier(fdw_part, xid); + + Assert(fdw_part->fdwxact_id); + + /* + * Insert the foreign transaction entry with the FDWXACT_STATUS_PREPARING + * status. Registration persists this information to the disk and logs + * (that way relaying it on standby). Thus in case we loose connectivity + * to the foreign server or crash ourselves, we will remember that we + * might have prepared transaction on the foreign server and try to + * resolve it when connectivity is restored or after crash recovery. + * + * If we prepare the transaction on the foreign server before persisting + * the information to the disk and crash in-between these two steps, + * we will forget that we prepared the transaction on the foreign server + * and will not be able to resolve it after the crash. Hence persist + * first then prepare. + */ + fdwxact = FdwXactInsertFdwXactEntry(xid, fdw_part); + + state = create_fdwxact_state(); + state->server = fdw_part->server; + state->usermapping = fdw_part->usermapping; + state->fdwxact_id = pstrdup(fdw_part->fdwxact_id); + + /* Update the status */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + Assert(fdwxact->status == FDWXACT_STATUS_INITIAL); + fdwxact->status = FDWXACT_STATUS_PREPARING; + LWLockRelease(FdwXactLock); + + /* + * Prepare the foreign transaction. + * + * Between FdwXactInsertFdwXactEntry call till this backend hears + * acknowledge from foreign server, the backend may abort the local + * transaction (say, because of a signal). + * + * During abort processing, we might try to resolve a never-preapred + * transaction, and get an error. This is fine as long as the FDW + * provides us unique prepared transaction identifiers. + */ + PG_TRY(); + { + fdw_part->prepare_foreign_xact_fn(state); + } + PG_CATCH(); + { + /* failed, back to the initial state */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact->status = FDWXACT_STATUS_INITIAL; + LWLockRelease(FdwXactLock); + + PG_RE_THROW(); + } + PG_END_TRY(); + + /* succeeded, update status */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact->status = FDWXACT_STATUS_PREPARED; + LWLockRelease(FdwXactLock); + } +} + +/* + * One-phase commit or rollback the given foreign transaction participant. + */ +static void +FdwXactOnePhaseEndForeignTransaction(FdwXactParticipant *fdw_part, + bool for_commit) +{ + FdwXactRslvState *state; + + Assert(fdw_part->commit_foreign_xact_fn); + Assert(fdw_part->rollback_foreign_xact_fn); + + state = create_fdwxact_state(); + state->server = fdw_part->server; + state->usermapping = fdw_part->usermapping; + state->flags = FDWXACT_FLAG_ONEPHASE; + + /* + * Commit or rollback foreign transaction in one-phase. Since we didn't + * insert FdwXact entry for this transaction we don't need to care + * failures. On failure we change to rollback. + */ + if (for_commit) + fdw_part->commit_foreign_xact_fn(state); + else + fdw_part->rollback_foreign_xact_fn(state); +} + +/* + * This function is used to create new foreign transaction entry before an FDW + * prepares and commit/rollback. The function adds the entry to WAL and it will + * be persisted to the disk under pg_fdwxact directory when checkpoint. + */ +static FdwXact +FdwXactInsertFdwXactEntry(TransactionId xid, FdwXactParticipant *fdw_part) +{ + FdwXact fdwxact; + FdwXactOnDiskData *fdwxact_file_data; + MemoryContext old_context; + int data_len; + + old_context = MemoryContextSwitchTo(TopTransactionContext); + + /* + * Enter the foreign transaction in the shared memory structure. + */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact = insert_fdwxact(MyDatabaseId, xid, fdw_part->server->serverid, + fdw_part->usermapping->userid, + fdw_part->usermapping->umid, fdw_part->fdwxact_id); + fdwxact->status = FDWXACT_STATUS_INITIAL; + fdwxact->held_by = MyBackendId; + LWLockRelease(FdwXactLock); + + fdw_part->fdwxact = fdwxact; + MemoryContextSwitchTo(old_context); + + /* + * Prepare to write the entry to a file. Also add xlog entry. The contents + * of the xlog record are same as what is written to the file. + */ + data_len = offsetof(FdwXactOnDiskData, fdwxact_id); + data_len = data_len + strlen(fdw_part->fdwxact_id) + 1; + data_len = MAXALIGN(data_len); + fdwxact_file_data = (FdwXactOnDiskData *) palloc0(data_len); + fdwxact_file_data->dbid = MyDatabaseId; + fdwxact_file_data->local_xid = xid; + fdwxact_file_data->serverid = fdw_part->server->serverid; + fdwxact_file_data->userid = fdw_part->usermapping->userid; + fdwxact_file_data->umid = fdw_part->usermapping->umid; + memcpy(fdwxact_file_data->fdwxact_id, fdw_part->fdwxact_id, + strlen(fdw_part->fdwxact_id) + 1); + + /* See note in RecordTransactionCommit */ + MyPgXact->delayChkpt = true; + + START_CRIT_SECTION(); + + /* Add the entry in the xlog and save LSN for checkpointer */ + XLogBeginInsert(); + XLogRegisterData((char *) fdwxact_file_data, data_len); + fdwxact->insert_end_lsn = XLogInsert(RM_FDWXACT_ID, XLOG_FDWXACT_INSERT); + XLogFlush(fdwxact->insert_end_lsn); + + /* If we crash now, we have prepared: WAL replay will fix things */ + + /* Store record's start location to read that later on CheckPoint */ + fdwxact->insert_start_lsn = ProcLastRecPtr; + + /* File is written completely, checkpoint can proceed with syncing */ + fdwxact->valid = true; + + /* Checkpoint can process now */ + MyPgXact->delayChkpt = false; + + END_CRIT_SECTION(); + + pfree(fdwxact_file_data); + return fdwxact; +} + +/* + * Insert a new entry for a given foreign transaction identified by transaction + * id, foreign server and user mapping, into the shared memory array. Caller + * must hold FdwXactLock in exclusive mode. + * + * If the entry already exists, the function raises an error. + */ +static FdwXact +insert_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + Oid umid, char *fdwxact_id) +{ + int i; + FdwXact fdwxact; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + /* Check for duplicated foreign transaction entry */ + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + fdwxact = FdwXactCtl->fdwxacts[i]; + if (fdwxact->dbid == dbid && + fdwxact->local_xid == xid && + fdwxact->serverid == serverid && + fdwxact->userid == userid) + ereport(ERROR, (errmsg("could not insert a foreign transaction entry"), + errdetail("duplicate entry with transaction id %u, serverid %u, userid %u", + xid, serverid, userid))); + } + + /* + * Get a next free foreign transaction entry. Raise error if there are + * none left. + */ + if (!FdwXactCtl->free_fdwxacts) + { + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("maximum number of foreign transactions reached"), + errhint("Increase max_prepared_foreign_transactions: \"%d\".", + max_prepared_foreign_xacts))); + } + fdwxact = FdwXactCtl->free_fdwxacts; + FdwXactCtl->free_fdwxacts = fdwxact->fdwxact_free_next; + + /* Insert the entry to shared memory array */ + Assert(FdwXactCtl->num_fdwxacts < max_prepared_foreign_xacts); + FdwXactCtl->fdwxacts[FdwXactCtl->num_fdwxacts++] = fdwxact; + + fdwxact->held_by = InvalidBackendId; + fdwxact->dbid = dbid; + fdwxact->local_xid = xid; + fdwxact->serverid = serverid; + fdwxact->userid = userid; + fdwxact->umid = umid; + fdwxact->insert_start_lsn = InvalidXLogRecPtr; + fdwxact->insert_end_lsn = InvalidXLogRecPtr; + fdwxact->valid = false; + fdwxact->ondisk = false; + fdwxact->inredo = false; + fdwxact->indoubt = false; + memcpy(fdwxact->fdwxact_id, fdwxact_id, strlen(fdwxact_id) + 1); + + return fdwxact; +} + +/* + * Remove the foreign prepared transaction entry from shared memory. + * Caller must hold FdwXactLock in exclusive mode. + */ +static void +remove_fdwxact(FdwXact fdwxact) +{ + int i; + + Assert(fdwxact != NULL); + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + if (FdwXactIsBeingResolved(fdwxact)) + elog(ERROR, "cannot remove fdwxact entry that is beging resolved"); + + /* Search the slot where this entry resided */ + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + if (FdwXactCtl->fdwxacts[i] == fdwxact) + break; + } + + /* We did not find the given entry in the array */ + if (i >= FdwXactCtl->num_fdwxacts) + ereport(ERROR, + (errmsg("could not remove a foreign transaction entry"), + errdetail("failed to find entry for xid %u, foreign server %u, and user %u", + fdwxact->local_xid, fdwxact->serverid, fdwxact->userid))); + + elog(DEBUG2, "remove fdwxact entry id %s, xid %u db %d user %d", + fdwxact->fdwxact_id, fdwxact->local_xid, fdwxact->dbid, + fdwxact->userid); + + /* Remove the entry from active array */ + FdwXactCtl->num_fdwxacts--; + FdwXactCtl->fdwxacts[i] = FdwXactCtl->fdwxacts[FdwXactCtl->num_fdwxacts]; + + /* Put it back into free list */ + fdwxact->fdwxact_free_next = FdwXactCtl->free_fdwxacts; + FdwXactCtl->free_fdwxacts = fdwxact; + + /* Reset informations */ + fdwxact->status = FDWXACT_STATUS_INVALID; + fdwxact->held_by = InvalidBackendId; + fdwxact->indoubt = false; + + if (!RecoveryInProgress()) + { + xl_fdwxact_remove record; + XLogRecPtr recptr; + + /* Fill up the log record before releasing the entry */ + record.serverid = fdwxact->serverid; + record.dbid = fdwxact->dbid; + record.xid = fdwxact->local_xid; + record.userid = fdwxact->userid; + + /* + * Now writing FdwXact state data to WAL. We have to set delayChkpt + * here, otherwise a checkpoint starting immediately after the + * WAL record is inserted could complete without fsync'ing our + * state file. (This is essentially the same kind of race condition + * as the COMMIT-to-clog-write case that RecordTransactionCommit + * uses delayChkpt for; see notes there.) + */ + START_CRIT_SECTION(); + + MyPgXact->delayChkpt = true; + + /* + * Log that we are removing the foreign transaction entry and + * remove the file from the disk as well. + */ + XLogBeginInsert(); + XLogRegisterData((char *) &record, sizeof(xl_fdwxact_remove)); + recptr = XLogInsert(RM_FDWXACT_ID, XLOG_FDWXACT_REMOVE); + XLogFlush(recptr); + + /* + * Now we can mark ourselves as out of the commit critical section: a + * checkpoint starting after this will certainly see the gxact as a + * candidate for fsyncing. + */ + MyPgXact->delayChkpt = false; + + END_CRIT_SECTION(); + } +} + +/* + * Return true and set FdwXactAtomicCommitReady to true if the current transaction + * modified data on two or more servers in FdwXactParticipants and + * local server itself. + */ +static bool +is_foreign_twophase_commit_required(void) +{ + ListCell* lc; + int nserverswritten = 0; + + if (!IsForeignTwophaseCommitRequested()) + return false; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (fdw_part->modified) + nserverswritten++; + } + + if ((MyXactFlags & XACT_FLAGS_WROTENONTEMPREL) != 0) + ++nserverswritten; + + /* + * Atomic commit is required if we modified data on two or more + * participants. + */ + if (nserverswritten <= 1) + return false; + + ForeignTwophaseCommitIsRequired = true; + return true; +} + +bool +FdwXactIsForeignTwophaseCommitRequired(void) +{ + return ForeignTwophaseCommitIsRequired; +} + +/* + * Compute the oldest xmin across all unresolved foreign transactions + * and store it in the ProcArray. + */ +static void +FdwXactComputeRequiredXmin(void) +{ + int i; + TransactionId agg_xmin = InvalidTransactionId; + + Assert(FdwXactCtl != NULL); + + LWLockAcquire(FdwXactLock, LW_SHARED); + + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + if (!fdwxact->valid) + continue; + + Assert(TransactionIdIsValid(fdwxact->local_xid)); + + if (!TransactionIdIsValid(agg_xmin) || + TransactionIdPrecedes(fdwxact->local_xid, agg_xmin)) + agg_xmin = fdwxact->local_xid; + } + + LWLockRelease(FdwXactLock); + + ProcArraySetFdwXactUnresolvedXmin(agg_xmin); +} + +/* + * Mark my foreign transaction participants as in-doubt and clear + * the FdwXactParticipants list. + * + * If we leave any foreign transaction, update the oldest xmin of unresolved + * transaction so that local transaction id of in-doubt transaction is not + * truncated. + */ +static void +ForgetAllFdwXactParticipants(void) +{ + ListCell *cell; + int n_lefts = 0; + + if (FdwXactParticipants == NIL) + return; + + foreach(cell, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(cell); + FdwXact fdwxact = fdw_part->fdwxact; + + /* Nothing to do if didn't register FdwXact entry yet */ + if (!fdw_part->fdwxact) + continue; + + /* + * There is a race condition; the FdwXact entries in FdwXactParticipants + * could be used by other backend before we forget in case where the + * resolver process removes the FdwXact entry and other backend reuses + * it before we forget. So we need to check if the entries are still + * associated with the transaction. + */ + SpinLockAcquire(&fdwxact->mutex); + if (fdwxact->held_by == MyBackendId) + { + fdwxact->held_by = InvalidBackendId; + fdwxact->indoubt = true; + n_lefts++; + } + SpinLockRelease(&fdwxact->mutex); + } + + /* + * If we left any FdwXact entries, update the oldest local transaction of + * unresolved distributed transaction and take over them to the foreign + * transaction resolver. + */ + if (n_lefts > 0) + { + elog(DEBUG1, "left %u foreign transactions in in-doubt status", n_lefts); + FdwXactComputeRequiredXmin(); + } + + FdwXactParticipants = NIL; +} + +/* + * When the process exits, forget all the entries. + */ +static void +AtProcExit_FdwXact(int code, Datum arg) +{ + ForgetAllFdwXactParticipants(); +} + +void +FdwXactCleanupAtProcExit(void) +{ + if (!SHMQueueIsDetached(&(MyProc->fdwXactLinks))) + { + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + SHMQueueDelete(&(MyProc->fdwXactLinks)); + LWLockRelease(FdwXactResolutionLock); + } +} + +/* + * Wait for the foreign transaction to be resolved. + * + * Initially backends start in state FDWXACT_NOT_WAITING and then change + * that state to FDWXACT_WAITING before adding ourselves to the wait queue. + * During FdwXactResolveForeignTransaction a fdwxact resolver changes the + * state to FDWXACT_WAIT_COMPLETE once all foreign transactions are resolved. + * This backend then resets its state to FDWXACT_NOT_WAITING. + * If a resolver fails to resolve the waiting transaction it moves us to + * the retry queue. + * + * This function is inspired by SyncRepWaitForLSN. + */ +void +FdwXactWaitToBeResolved(TransactionId wait_xid, bool is_commit) +{ + char *new_status = NULL; + const char *old_status; + + Assert(FdwXactCtl != NULL); + Assert(TransactionIdIsValid(wait_xid)); + Assert(SHMQueueIsDetached(&(MyProc->fdwXactLinks))); + Assert(MyProc->fdwXactState == FDWXACT_NOT_WAITING); + + /* Quick exit if atomic commit is not requested */ + if (!IsForeignTwophaseCommitRequested()) + return; + + /* + * Also, exit if the transaction itself has no foreign transaction + * participants. + */ + if (FdwXactParticipants == NIL && wait_xid == MyPgXact->xid) + return; + + /* Set backend status and enqueue itself to the active queue */ + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + MyProc->fdwXactState = FDWXACT_WAITING; + MyProc->fdwXactWaitXid = wait_xid; + MyProc->fdwXactNextResolutionTs = GetCurrentTransactionStopTimestamp(); + FdwXactQueueInsert(MyProc); + Assert(FdwXactQueueIsOrderedByTimestamp()); + LWLockRelease(FdwXactResolutionLock); + + /* Launch a resolver process if not yet, or wake up */ + FdwXactLaunchOrWakeupResolver(); + + /* + * Alter ps display to show waiting for foreign transaction + * resolution. + */ + if (update_process_title) + { + int len; + + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 31 + 1); + memcpy(new_status, old_status, len); + sprintf(new_status + len, " waiting for resolution %d", wait_xid); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off "waiting ..." */ + } + + /* Wait for all foreign transactions to be resolved */ + for (;;) + { + /* Must reset the latch before testing state */ + ResetLatch(MyLatch); + + /* + * Acquiring the lock is not needed, the latch ensures proper + * barriers. If it looks like we're done, we must really be done, + * because once walsender changes the state to FDWXACT_WAIT_COMPLETE, + * it will never update it again, so we can't be seeing a stale value + * in that case. + */ + if (MyProc->fdwXactState == FDWXACT_WAIT_COMPLETE) + break; + + /* + * If a wait for foreign transaction resolution is pending, we can + * neither acknowledge the commit nor raise ERROR or FATAL. The latter + * would lead the client to believe that the distributed transaction + * aborted, which is not true: it's already committed locally. The + * former is no good either: the client has requested committing a + * distributed transaction, and is entitled to assume that a acknowledged + * commit is also commit on all foreign servers, which might not be + * true. So in this case we issue a WARNING (which some clients may + * be able to interpret) and shut off further output. We do NOT reset + * PorcDiePending, so that the process will die after the commit is + * cleaned up. + */ + if (ProcDiePending) + { + ereport(WARNING, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("canceling the wait for resolving foreign transaction and terminating connection due to administratorcommand"), + errdetail("The transaction has already committed locally, but might not have been committed on theforeign server."))); + whereToSendOutput = DestNone; + FdwXactCancelWait(); + break; + } + + /* + * If a query cancel interrupt arrives we just terminate the wait with + * a suitable warning. The foreign transactions can be orphaned but + * the foreign xact resolver can pick up them and tries to resolve them + * later. + */ + if (QueryCancelPending) + { + QueryCancelPending = false; + ereport(WARNING, + (errmsg("canceling wait for resolving foreign transaction due to user request"), + errdetail("The transaction has already committed locally, but might not have been committed on theforeign server."))); + FdwXactCancelWait(); + break; + } + + /* + * If the postmaster dies, we'll probably never get an + * acknowledgement, because all the wal sender processes will exit. So + * just bail out. + */ + if (!PostmasterIsAlive()) + { + ProcDiePending = true; + whereToSendOutput = DestNone; + FdwXactCancelWait(); + break; + } + + /* + * Wait on latch. Any condition that should wake us up will set the + * latch, so no need for timeout. + */ + WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1, + WAIT_EVENT_FDWXACT_RESOLUTION); + } + + pg_read_barrier(); + + Assert(SHMQueueIsDetached(&(MyProc->fdwXactLinks))); + MyProc->fdwXactState = FDWXACT_NOT_WAITING; + + if (new_status) + { + set_ps_display(new_status, false); + pfree(new_status); + } +} + +/* + * Return true if there are at least one backend in the wait queue. The caller + * must hold FdwXactResolutionLock. + */ +bool +FdwXactWaiterExists(Oid dbid) +{ + PGPROC *proc; + + Assert(LWLockHeldByMeInMode(FdwXactResolutionLock, LW_SHARED)); + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + + while (proc) + { + if (proc->databaseId == dbid) + return true; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + return false; +} + +/* + * Insert the waiter to the wait queue in fdwXactNextResolutoinTs order. + */ +static void +FdwXactQueueInsert(PGPROC *waiter) +{ + PGPROC *proc; + + Assert(LWLockHeldByMeInMode(FdwXactResolutionLock, LW_EXCLUSIVE)); + + proc = (PGPROC *) SHMQueuePrev(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + + while (proc) + { + if (proc->fdwXactNextResolutionTs < waiter->fdwXactNextResolutionTs) + break; + + proc = (PGPROC *) SHMQueuePrev(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + if (proc) + SHMQueueInsertAfter(&(proc->fdwXactLinks), &(waiter->fdwXactLinks)); + else + SHMQueueInsertAfter(&(FdwXactRslvCtl->fdwxact_queue), &(waiter->fdwXactLinks)); +} + +#ifdef USE_ASSERT_CHECKING +static bool +FdwXactQueueIsOrderedByTimestamp(void) +{ + PGPROC *proc; + TimestampTz lastTs; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + lastTs = 0; + + while (proc) + { + + if (proc->fdwXactNextResolutionTs < lastTs) + return false; + + lastTs = proc->fdwXactNextResolutionTs; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + return true; +} +#endif + +/* + * Acquire FdwXactResolutionLock and cancel any wait currently in progress. + */ +static void +FdwXactCancelWait(void) +{ + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + if (!SHMQueueIsDetached(&(MyProc->fdwXactLinks))) + SHMQueueDelete(&(MyProc->fdwXactLinks)); + MyProc->fdwXactState = FDWXACT_NOT_WAITING; + LWLockRelease(FdwXactResolutionLock); +} + +/* + * AtEOXact_FdwXacts + */ +extern void +AtEOXact_FdwXacts(bool is_commit) +{ + ListCell *lcell; + + if (!is_commit) + { + foreach (lcell, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = lfirst(lcell); + + /* + * If the foreign transaction has FdwXact entry we might have + * prepared it. Skip already-prepared foreign transaction because + * it has closed its transaction. But we are not sure that foreign + * transaction with status == FDWXACT_STATUS_PREPARING has been + * prepared or not. So we call the rollback API to close its + * transaction for safety. The prepared foreign transaction that + * we might have will be resolved by the foreign transaction + * resolver. + */ + if (fdw_part->fdwxact) + { + bool is_prepared; + + LWLockAcquire(FdwXactLock, LW_SHARED); + is_prepared = fdw_part->fdwxact && + fdw_part->fdwxact->status == FDWXACT_STATUS_PREPARED; + LWLockRelease(FdwXactLock); + + if (is_prepared) + continue; + } + + /* One-phase rollback foreign transaction */ + FdwXactOnePhaseEndForeignTransaction(fdw_part, false); + } + } + + /* + * In commit cases, we have already prepared foreign transactions during + * pre-commit phase. And these prepared transactions will be resolved by + * the resolver process. + */ + + ForgetAllFdwXactParticipants(); + ForeignTwophaseCommitIsRequired = false; +} + +/* + * Prepare foreign transactions. + * + * Note that it's possible that the transaction aborts after we prepared some + * of participants. In this case we change to rollback and rollback all foreign + * transactions. + */ +void +AtPrepare_FdwXacts(void) +{ + if (FdwXactParticipants == NIL) + return; + + /* Check for an invalid condition */ + if (!IsForeignTwophaseCommitRequested()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a distributed transaction when foreign_twophase_commit is \'disabled\'"))); + + /* + * We cannot prepare if any foreign server of participants isn't capable + * of two-phase commit. + */ + if (is_foreign_twophase_commit_required() && + (MyXactFlags & XACT_FLAGS_FDWNOPREPARE) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare the transaction because some foreign servers involved in transaction can not preparethe transaction"))); + + /* Prepare transactions on participating foreign servers. */ + FdwXactPrepareForeignTransactions(); + + FdwXactParticipants = NIL; +} + +/* + * Return one backend that connects to my database and is waiting for + * resolution. + */ +PGPROC * +FdwXactGetWaiter(TimestampTz *nextResolutionTs_p, TransactionId *waitXid_p) +{ + PGPROC *proc; + + LWLockAcquire(FdwXactResolutionLock, LW_SHARED); + Assert(FdwXactQueueIsOrderedByTimestamp()); + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(FdwXactRslvCtl->fdwxact_queue), + offsetof(PGPROC, fdwXactLinks)); + + while (proc) + { + if (proc->databaseId == MyDatabaseId) + break; + + proc = (PGPROC *) SHMQueueNext(&(FdwXactRslvCtl->fdwxact_queue), + &(proc->fdwXactLinks), + offsetof(PGPROC, fdwXactLinks)); + } + + if (proc) + { + *nextResolutionTs_p = proc->fdwXactNextResolutionTs; + *waitXid_p = proc->fdwXactWaitXid; + } + else + { + *nextResolutionTs_p = -1; + *waitXid_p = InvalidTransactionId; + } + + LWLockRelease(FdwXactResolutionLock); + + return proc; +} + +/* + * Get one FdwXact entry to resolve. This function intended to be used when + * a resolver process get FdwXact entries to resolve. So we search entries + * while not including in-doubt transactions and in-progress transactions. + */ +static FdwXact +get_fdwxact_to_resolve(Oid dbid, TransactionId xid) +{ + List *fdwxacts = NIL; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + /* Don't include both in-doubt transactions and in-progress transactions */ + fdwxacts = get_fdwxacts(dbid, xid, InvalidOid, InvalidOid, + false, false, false); + + return fdwxacts == NIL ? NULL : (FdwXact) linitial(fdwxacts); +} + +/* + * Resolve one distributed transaction on the given database . The target + * distributed transaction is fetched from the waiting queue and its transaction + * participants are fetched from the global array. + * + * Release the waiter and return true after we resolved the all of the foreign + * transaction participants. On failure, we re-enqueue the waiting backend after + * incremented the next resolution time. + */ +void +FdwXactResolveTransactionAndReleaseWaiter(Oid dbid, TransactionId xid, + PGPROC *waiter) +{ + FdwXact fdwxact; + + Assert(TransactionIdIsValid(xid)); + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + while ((fdwxact = get_fdwxact_to_resolve(MyDatabaseId, xid)) != NULL) + { + FdwXactRslvState *state; + ForeignServer *server; + UserMapping *usermapping; + + CHECK_FOR_INTERRUPTS(); + + server = GetForeignServer(fdwxact->serverid); + usermapping = GetUserMapping(fdwxact->userid, fdwxact->serverid); + + state = create_fdwxact_state(); + SpinLockAcquire(&fdwxact->mutex); + state->server = server; + state->usermapping = usermapping; + state->fdwxact_id = pstrdup(fdwxact->fdwxact_id); + SpinLockRelease(&fdwxact->mutex); + + FdwXactDetermineTransactionFate(fdwxact, false); + + /* Do not hold during foreign transaction resolution */ + LWLockRelease(FdwXactLock); + + PG_TRY(); + { + /* + * Resolve the foreign transaction. When committing or aborting + * prepared foreign transactions the previous status is always + * FDWXACT_STATUS_PREPARED. + */ + FdwXactResolveForeignTransaction(fdwxact, state, + FDWXACT_STATUS_PREPARED); + } + PG_CATCH(); + { + /* + * Failed to resolve. Re-insert the waiter to the tail of retry + * queue if the waiter is still waiting. + */ + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + if (waiter->fdwXactState == FDWXACT_WAITING) + { + SHMQueueDelete(&(waiter->fdwXactLinks)); + pg_write_barrier(); + waiter->fdwXactNextResolutionTs = + TimestampTzPlusMilliseconds(waiter->fdwXactNextResolutionTs, + foreign_xact_resolution_retry_interval); + FdwXactQueueInsert(waiter); + } + LWLockRelease(FdwXactResolutionLock); + + PG_RE_THROW(); + } + PG_END_TRY(); + + elog(DEBUG2, "resolved one foreign transaction xid %u, serverid %d, userid %d", + fdwxact->local_xid, fdwxact->serverid, fdwxact->userid); + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + } + + LWLockRelease(FdwXactLock); + + LWLockAcquire(FdwXactResolutionLock, LW_EXCLUSIVE); + + /* + * Remove waiter from shmem queue, if not detached yet. The waiter + * could already be detached if user cancelled to wait before + * resolution. + */ + if (!SHMQueueIsDetached(&(waiter->fdwXactLinks))) + { + TransactionId wait_xid = waiter->fdwXactWaitXid; + + SHMQueueDelete(&(waiter->fdwXactLinks)); + pg_write_barrier(); + + /* Set state to complete */ + waiter->fdwXactState = FDWXACT_WAIT_COMPLETE; + + /* Wake up the waiter only when we have set state and removed from queue */ + SetLatch(&(waiter->procLatch)); + + elog(DEBUG2, "released the proc with xid %u", wait_xid); + } + else + elog(DEBUG2, "the waiter backend had been already detached"); + + LWLockRelease(FdwXactResolutionLock); +} + +/* + * Determine whether the given foreign transaction should be committed or + * rolled back according to the result of the local transaction. This function + * changes fdwxact->status so the caller must hold FdwXactLock in exclusive + * mode or passing need_lock with true. + */ +static void +FdwXactDetermineTransactionFate(FdwXact fdwxact, bool need_lock) +{ + if (need_lock) + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + /* + * The being resolved transaction must be either that has been cancelled + * and marked as in-doubt or that has been prepared. + */ + Assert(fdwxact->indoubt || + fdwxact->status == FDWXACT_STATUS_PREPARED); + + /* + * If the local transaction is already committed, commit prepared + * foreign transaction. + */ + if (TransactionIdDidCommit(fdwxact->local_xid)) + fdwxact->status = FDWXACT_STATUS_COMMITTING; + + /* + * If the local transaction is already aborted, abort prepared + * foreign transactions. + */ + else if (TransactionIdDidAbort(fdwxact->local_xid)) + fdwxact->status = FDWXACT_STATUS_ABORTING; + + + /* + * The local transaction is not in progress but the foreign + * transaction is not prepared on the foreign server. This + * can happen when transaction failed after registered this + * entry but before actual preparing on the foreign server. + * So let's assume it aborted. + */ + else if (!TransactionIdIsInProgress(fdwxact->local_xid)) + fdwxact->status = FDWXACT_STATUS_ABORTING; + + /* + * The Local transaction is in progress and foreign transaction is + * about to be committed or aborted. This should not happen except for one + * case where the local transaction is prepared and this foreign transaction + * is being resolved manually using by pg_resolve_foreign_xact(). Raise an + * error anyway since we cannot determine the fate of this foreign + * transaction according to the local transaction whose fate is also not + * determined. + */ + else + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot resolve the foreign transaction associated with in-progress transaction %u on server %u", + fdwxact->local_xid, fdwxact->serverid), + errhint("The local transaction with xid %u might be prepared", + fdwxact->local_xid))); + + if (need_lock) + LWLockRelease(FdwXactLock); +} + +/* + * Resolve the foreign transaction using the foreign data wrapper's transaction + * callback function. The 'state' is passed to the callback function. The fate of + * foreign transaction must be determined. If foreign transaction is resolved + * successfully, remove the FdwXact entry from the shared memory and also + * remove the corresponding on-disk file. If failed, the status of FdwXact + * entry changes to 'fallback_status' before erroring out. + */ +static void +FdwXactResolveForeignTransaction(FdwXact fdwxact, FdwXactRslvState *state, + FdwXactStatus fallback_status) +{ + ForeignServer *server; + ForeignDataWrapper *fdw; + FdwRoutine *fdw_routine; + bool is_commit; + + Assert(state != NULL); + Assert(state->server && state->usermapping && state->fdwxact_id); + Assert(fdwxact != NULL); + + LWLockAcquire(FdwXactLock, LW_SHARED); + + if (fdwxact->status != FDWXACT_STATUS_COMMITTING && + fdwxact->status != FDWXACT_STATUS_ABORTING) + elog(ERROR, "cannot resolve foreign transaction whose fate is not determined"); + + is_commit = fdwxact->status == FDWXACT_STATUS_COMMITTING; + LWLockRelease(FdwXactLock); + + server = GetForeignServer(fdwxact->serverid); + fdw = GetForeignDataWrapper(server->fdwid); + fdw_routine = GetFdwRoutine(fdw->fdwhandler); + + PG_TRY(); + { + if (is_commit) + fdw_routine->CommitForeignTransaction(state); + else + fdw_routine->RollbackForeignTransaction(state); + } + PG_CATCH(); + { + /* Back to the fallback status */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + fdwxact->status = fallback_status; + LWLockRelease(FdwXactLock); + + PG_RE_THROW(); + } + PG_END_TRY(); + + /* Resolution was a success, remove the entry */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + elog(DEBUG1, "successfully %s the foreign transaction with xid %u db %u server %u user %u", + is_commit ? "committed" : "rolled back", + fdwxact->local_xid, fdwxact->dbid, fdwxact->serverid, + fdwxact->userid); + + fdwxact->status = FDWXACT_STATUS_RESOLVED; + if (fdwxact->ondisk) + RemoveFdwXactFile(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + true); + remove_fdwxact(fdwxact); + LWLockRelease(FdwXactLock); +} + +/* + * Return palloc'd and initialized FdwXactRslvState. + */ +static FdwXactRslvState * +create_fdwxact_state(void) +{ + FdwXactRslvState *state; + + state = palloc(sizeof(FdwXactRslvState)); + state->server = NULL; + state->usermapping = NULL; + state->fdwxact_id = NULL; + state->flags = 0; + + return state; +} + +/* + * Return at least one FdwXact entry that matches to given argument, + * otherwise return NULL. All arguments must be valid values so that it can + * search exactly one (or none) entry. Note that this function intended to be + * used for modifying the returned FdwXact entry, so the caller must hold + * FdwXactLock in exclusive mode and it doesn't include the in-progress + * FdwXact entries. + */ +static FdwXact +get_one_fdwxact(Oid dbid, TransactionId xid, Oid serverid, Oid userid) +{ + List *fdwxact_list; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + /* All search conditions must be valid values */ + Assert(TransactionIdIsValid(xid)); + Assert(OidIsValid(serverid)); + Assert(OidIsValid(userid)); + Assert(OidIsValid(dbid)); + + /* Include in-dbout transactions but don't include in-progress ones */ + fdwxact_list = get_fdwxacts(dbid, xid, serverid, userid, + true, false, false); + + /* Must be one entry since we search it by the unique key */ + Assert(list_length(fdwxact_list) <= 1); + + /* Could not find entry */ + if (fdwxact_list == NIL) + return NULL; + + return (FdwXact) linitial(fdwxact_list); +} + +/* + * Return true if there is at least one prepared foreign transaction + * which matches given arguments. + */ +bool +fdwxact_exists(Oid dbid, Oid serverid, Oid userid) +{ + List *fdwxact_list; + + /* Find entries from all FdwXact entries */ + fdwxact_list = get_fdwxacts(dbid, InvalidTransactionId, serverid, + userid, true, true, true); + + return fdwxact_list != NIL; +} + +/* + * Returns an array of all foreign prepared transactions for the user-level + * function pg_foreign_xacts, and the number of entries to num_p. + * + * WARNING -- we return even those transactions whose information is not + * completely filled yet. The caller should filter them out if he doesn't + * want them. + * + * The returned array is palloc'd. + */ +static FdwXact +get_all_fdwxacts(int *num_p) +{ + List *all_fdwxacts; + ListCell *lc; + FdwXact fdwxacts; + int num_fdwxacts = 0; + + Assert(num_p != NULL); + + /* Get all entries */ + all_fdwxacts = get_fdwxacts(InvalidOid, InvalidTransactionId, + InvalidOid, InvalidOid, true, + true, true); + + if (all_fdwxacts == NIL) + { + *num_p = 0; + return NULL; + } + + fdwxacts = (FdwXact) + palloc(sizeof(FdwXactData) * list_length(all_fdwxacts)); + *num_p = list_length(all_fdwxacts); + + /* Convert list to array of FdwXact */ + foreach(lc, all_fdwxacts) + { + FdwXact fx = (FdwXact) lfirst(lc); + + memcpy(fdwxacts + num_fdwxacts, fx, + sizeof(FdwXactData)); + num_fdwxacts++; + } + + list_free(all_fdwxacts); + + return fdwxacts; +} + +/* + * Return a list of FdwXact matched to given arguments. Otherwise return NIL. + * The search condition is defined by arguments with valid values for + * respective datatypes. 'include_indoubt' and 'include_in_progress' are the + * option for that the result includes in-doubt transactions and in-progress + * transactions respecitively. + */ +static List* +get_fdwxacts(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + bool include_indoubt, bool include_in_progress, bool need_lock) +{ + int i; + List *fdwxact_list = NIL; + + if (need_lock) + LWLockAcquire(FdwXactLock, LW_SHARED); + + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + /* dbid */ + if (OidIsValid(dbid) && fdwxact->dbid != dbid) + continue; + + /* xid */ + if (TransactionIdIsValid(xid) && xid != fdwxact->local_xid) + continue; + + /* serverid */ + if (OidIsValid(serverid) && serverid != fdwxact->serverid) + continue; + + /* userid */ + if (OidIsValid(userid) && fdwxact->userid != userid) + continue; + + /* include in-doubt transaction? */ + if (!include_indoubt && fdwxact->indoubt) + continue; + + /* include in-progress transaction? */ + if (!include_in_progress && FdwXactIsBeingResolved(fdwxact)) + continue; + + /* Append it if matched */ + fdwxact_list = lappend(fdwxact_list, fdwxact); + } + + if (need_lock) + LWLockRelease(FdwXactLock); + + return fdwxact_list; +} + +/* Apply the redo log for a foreign transaction */ +void +fdwxact_redo(XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDWXACT_INSERT) + { + /* + * Add fdwxact entry and set start/end lsn of the WAL record + * in FdwXact entry. + */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + FdwXactRedoAdd(XLogRecGetData(record), + record->ReadRecPtr, + record->EndRecPtr); + LWLockRelease(FdwXactLock); + } + else if (info == XLOG_FDWXACT_REMOVE) + { + xl_fdwxact_remove *record = (xl_fdwxact_remove *) rec; + + /* Delete FdwXact entry and file if exists */ + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + FdwXactRedoRemove(record->dbid, record->xid, record->serverid, + record->userid, false); + LWLockRelease(FdwXactLock); + } + else + elog(ERROR, "invalid log type %d in foreign transction log record", info); + + return; +} + +/* + * Return a null-terminated foreign transaction identifier. If the given + * foreign server's FDW provides getPrepareId callback we return the identifier + * returned from it. Otherwise we generate an unique identifier with in the + * form of "fx_<random number>_<xid>_<serverid>_<userid> whose length is + * less than FDWXACT_ID_MAX_LEN. + * + * Returned string value is used to identify foreign transaction. The + * identifier should not be same as any other concurrent prepared transaction + * identifier. + * + * To make the foreign transactionid unique, we should ideally use something + * like UUID, which gives unique ids with high probability, but that may be + * expensive here and UUID extension which provides the function to generate + * UUID is not part of the core code. + */ +static char * +get_fdwxact_identifier(FdwXactParticipant *fdw_part, TransactionId xid) +{ + char *id; + int id_len = 0; + + if (!fdw_part->get_prepareid_fn) + { + char buf[FDWXACT_ID_MAX_LEN] = {0}; + + /* + * FDW doesn't provide the callback function, generate an unique + * idenetifier. + */ + snprintf(buf, FDWXACT_ID_MAX_LEN, "fx_%ld_%u_%d_%d", + Abs(random()), xid, fdw_part->server->serverid, + fdw_part->usermapping->userid); + + return pstrdup(buf); + } + + /* Get an unique identifier from callback function */ + id = fdw_part->get_prepareid_fn(xid, fdw_part->server->serverid, + fdw_part->usermapping->userid, + &id_len); + + if (id == NULL) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + (errmsg("foreign transaction identifier is not provided")))); + + /* Check length of foreign transaction identifier */ + if (id_len > FDWXACT_ID_MAX_LEN) + { + id[FDWXACT_ID_MAX_LEN] = '\0'; + ereport(ERROR, + (errcode(ERRCODE_NAME_TOO_LONG), + errmsg("foreign transaction identifer \"%s\" is too long", + id), + errdetail("foreign transaction identifier must be less than %d characters.", + FDWXACT_ID_MAX_LEN))); + } + + id[id_len] = '\0'; + return pstrdup(id); +} + +/* + * We must fsync the foreign transaction state file that is valid or generated + * during redo and has a inserted LSN <= the checkpoint'S redo horizon. + * The foreign transaction entries and hence the corresponding files are expected + * to be very short-lived. By executing this function at the end, we might have + * lesser files to fsync, thus reducing some I/O. This is similar to + * CheckPointTwoPhase(). + * + * This is deliberately run as late as possible in the checkpoint sequence, + * because FdwXacts ordinarily have short lifespans, and so it is quite + * possible that FdwXacts that were valid at checkpoint start will no longer + * exist if we wait a little bit. With typical checkpoint settings this + * will be about 3 minutes for an online checkpoint, so as a result we + * expect that there will be no FdwXacts that need to be copied to disk. + * + * If a FdwXact remains valid across multiple checkpoints, it will already + * be on disk so we don't bother to repeat that write. + */ +void +CheckPointFdwXacts(XLogRecPtr redo_horizon) +{ + int cnt; + int serialized_fdwxacts = 0; + + if (max_prepared_foreign_xacts <= 0) + return; /* nothing to do */ + + TRACE_POSTGRESQL_FDWXACT_CHECKPOINT_START(); + + /* + * We are expecting there to be zero FdwXact that need to be copied to + * disk, so we perform all I/O while holding FdwXactLock for simplicity. + * This presents any new foreign xacts from preparing while this occurs, + * which shouldn't be a problem since the presence fo long-lived prepared + * foreign xacts indicated the transaction manager isn't active. + * + * It's also possible to move I/O out of the lock, but on every error we + * should check whether somebody committed our transaction in different + * backend. Let's leave this optimisation for future, if somebody will + * spot that this place cause bottleneck. + * + * Note that it isn't possible for there to be a FdwXact with a + * insert_end_lsn set prior to the last checkpoint yet is marked + * invalid, because of the efforts with delayChkpt. + */ + LWLockAcquire(FdwXactLock, LW_SHARED); + for (cnt = 0; cnt < FdwXactCtl->num_fdwxacts; cnt++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[cnt]; + + if ((fdwxact->valid || fdwxact->inredo) && + !fdwxact->ondisk && + fdwxact->insert_end_lsn <= redo_horizon) + { + char *buf; + int len; + + XlogReadFdwXactData(fdwxact->insert_start_lsn, &buf, &len); + RecreateFdwXactFile(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + buf, len); + fdwxact->ondisk = true; + fdwxact->insert_start_lsn = InvalidXLogRecPtr; + fdwxact->insert_end_lsn = InvalidXLogRecPtr; + pfree(buf); + serialized_fdwxacts++; + } + } + + LWLockRelease(FdwXactLock); + + /* + * Flush unconditionally the parent directory to make any information + * durable on disk. FdwXact files could have been removed and those + * removals need to be made persistent as well as any files newly created. + */ + fsync_fname(FDWXACTS_DIR, true); + + TRACE_POSTGRESQL_FDWXACT_CHECKPOINT_DONE(); + + if (log_checkpoints && serialized_fdwxacts > 0) + ereport(LOG, + (errmsg_plural("%u foreign transaction state file was written " + "for long-running prepared transactions", + "%u foreign transaction state files were written " + "for long-running prepared transactions", + serialized_fdwxacts, + serialized_fdwxacts))); +} + +/* + * Reads foreign transaction data from xlog. During checkpoint this data will + * be moved to fdwxact files and ReadFdwXactFile should be used instead. + * + * Note clearly that this function accesses WAL during normal operation, similarly + * to the way WALSender or Logical Decoding would do. It does not run during + * crash recovery or standby processing. + */ +static void +XlogReadFdwXactData(XLogRecPtr lsn, char **buf, int *len) +{ + XLogRecord *record; + XLogReaderState *xlogreader; + char *errormsg; + + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + &read_local_xlog_page, NULL); + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"), + errdetail("Failed while allocating an XLog reading processor."))); + + record = XLogReadRecord(xlogreader, lsn, &errormsg); + if (record == NULL) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read foreign transaction state from xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (XLogRecGetRmid(xlogreader) != RM_FDWXACT_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_FDWXACT_INSERT) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("expected foreign transaction state data is not present in xlog at %X/%X", + (uint32) (lsn >> 32), + (uint32) lsn))); + + if (len != NULL) + *len = XLogRecGetDataLen(xlogreader); + + *buf = palloc(sizeof(char) * XLogRecGetDataLen(xlogreader)); + memcpy(*buf, XLogRecGetData(xlogreader), sizeof(char) * XLogRecGetDataLen(xlogreader)); + + XLogReaderFree(xlogreader); +} + +/* + * Recreates a foreign transaction state file. This is used in WAL replay + * and during checkpoint creation. + * + * Note: content and len don't include CRC. + */ +void +RecreateFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, void *content, int len) +{ + char path[MAXPGPATH]; + pg_crc32c statefile_crc; + int fd; + + /* Recompute CRC */ + INIT_CRC32C(statefile_crc); + COMP_CRC32C(statefile_crc, content, len); + FIN_CRC32C(statefile_crc); + + FdwXactFilePath(path, dbid, xid, serverid, userid); + + fd = OpenTransientFile(path, O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY); + + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not recreate foreign transaction state file \"%s\": %m", + path))); + + /* Write content and CRC */ + pgstat_report_wait_start(WAIT_EVENT_FDWXACT_FILE_WRITE); + if (write(fd, content, len) != len) + { + /* if write didn't set errno, assume problem is no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transcation state file: %m"))); + } + if (write(fd, &statefile_crc, sizeof(pg_crc32c)) != sizeof(pg_crc32c)) + { + if (errno == 0) + errno = ENOSPC; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not write foreign transcation state file: %m"))); + } + pgstat_report_wait_end(); + + /* + * We must fsync the file because the end-of-replay checkpoint will not do + * so, there being no FDWXACT in shared memory yet to tell it to. + */ + pgstat_report_wait_start(WAIT_EVENT_FDWXACT_FILE_SYNC); + if (pg_fsync(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not fsync foreign transaction state file: %m"))); + pgstat_report_wait_end(); + + if (CloseTransientFile(fd) != 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close foreign transaction file: %m"))); +} + +/* + * Given a transaction id, userid and serverid read it either from disk + * or read it directly via shmem xlog record pointer using the provided + * "insert_start_lsn". + */ +static char * +ProcessFdwXactBuffer(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, XLogRecPtr insert_start_lsn, bool fromdisk) +{ + TransactionId origNextXid = + XidFromFullTransactionId(ShmemVariableCache->nextFullXid); + char *buf; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + + if (!fromdisk) + Assert(!XLogRecPtrIsInvalid(insert_start_lsn)); + + /* Reject XID if too new */ + if (TransactionIdFollowsOrEquals(xid, origNextXid)) + { + if (fromdisk) + { + ereport(WARNING, + (errmsg("removing future fdwxact state file for xid %u, server %u and user %u", + xid, serverid, userid))); + RemoveFdwXactFile(dbid, xid, serverid, userid, true); + } + else + { + ereport(WARNING, + (errmsg("removing future fdwxact state from memory for xid %u, server %u and user %u", + xid, serverid, userid))); + FdwXactRedoRemove(dbid, xid, serverid, userid, true); + } + return NULL; + } + + if (fromdisk) + { + /* Read and validate file */ + buf = ReadFdwXactFile(dbid, xid, serverid, userid); + } + else + { + /* Read xlog data */ + XlogReadFdwXactData(insert_start_lsn, &buf, NULL); + } + + return buf; +} + +/* + * Read and validate the foreign transaction state file. + * + * If it looks OK (has a valid magic number and CRC), return the palloc'd + * contents of the file, issuing an error when finding corrupted data. + * This state can be reached when doing recovery. + */ +static char * +ReadFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid) +{ + char path[MAXPGPATH]; + int fd; + FdwXactOnDiskData *fdwxact_file_data; + struct stat stat; + uint32 crc_offset; + pg_crc32c calc_crc; + pg_crc32c file_crc; + char *buf; + int r; + + FdwXactFilePath(path, dbid, xid, serverid, userid); + + fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); + if (fd < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open FDW transaction state file \"%s\": %m", + path))); + + /* + * Check file length. We can determine a lower bound pretty easily. We + * set an upper bound to avoid palloc() failure on a corrupt file, though + * we can't guarantee that we won't get an out of memory error anyway, + * even on a valid file. + */ + if (fstat(fd, &stat)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat FDW transaction state file \"%s\": %m", + path))); + + if (stat.st_size < (offsetof(FdwXactOnDiskData, fdwxact_id) + + sizeof(pg_crc32c)) || + stat.st_size > MaxAllocSize) + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("too large FDW transaction state file \"%s\": %m", + path))); + + crc_offset = stat.st_size - sizeof(pg_crc32c); + if (crc_offset != MAXALIGN(crc_offset)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("incorrect alignment of CRC offset for file \"%s\"", + path))); + + /* + * Ok, slurp in the file. + */ + buf = (char *) palloc(stat.st_size); + fdwxact_file_data = (FdwXactOnDiskData *) buf; + + /* Slurp the file */ + pgstat_report_wait_start(WAIT_EVENT_FDWXACT_FILE_READ); + r = read(fd, buf, stat.st_size); + if (r != stat.st_size) + { + if (r < 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + else + ereport(ERROR, + (errmsg("could not read file \"%s\": read %d of %zu", + path, r, (Size) stat.st_size))); + } + pgstat_report_wait_end(); + + if (CloseTransientFile(fd)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", path))); + + /* + * Check the CRC. + */ + INIT_CRC32C(calc_crc); + COMP_CRC32C(calc_crc, buf, crc_offset); + FIN_CRC32C(calc_crc); + + file_crc = *((pg_crc32c *) (buf + crc_offset)); + + if (!EQ_CRC32C(calc_crc, file_crc)) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("calculated CRC checksum does not match value stored in file \"%s\"", + path))); + + /* Check if the contents is an expected data */ + fdwxact_file_data = (FdwXactOnDiskData *) buf; + if (fdwxact_file_data->dbid != dbid || + fdwxact_file_data->serverid != serverid || + fdwxact_file_data->userid != userid || + fdwxact_file_data->local_xid != xid) + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid foreign transaction state file \"%s\"", + path))); + + return buf; +} + +/* + * Scan the shared memory entries of FdwXact and determine the range of valid + * XIDs present. This is run during database startup, after we have completed + * reading WAL. ShmemVariableCache->nextFullXid has been set to one more than + * the highest XID for which evidence exists in WAL. + + * On corrupted two-phase files, fail immediately. Keeping around broken + * entries and let replay continue causes harm on the system, and a new + * backup should be rolled in. + + * Our other responsibility is to update and return the oldest valid XID + * among the distributed transactions. This is needed to synchronize pg_subtrans + * startup properly. + */ +TransactionId +PrescanFdwXacts(TransactionId oldestActiveXid) +{ + FullTransactionId nextFullXid = ShmemVariableCache->nextFullXid; + TransactionId origNextXid = XidFromFullTransactionId(nextFullXid); + TransactionId result = origNextXid; + int i; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + char *buf; + + buf = ProcessFdwXactBuffer(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + fdwxact->insert_start_lsn, fdwxact->ondisk); + + if (buf == NULL) + continue; + + if (TransactionIdPrecedes(fdwxact->local_xid, result)) + result = fdwxact->local_xid; + + pfree(buf); + } + LWLockRelease(FdwXactLock); + + return result; +} + +/* + * Scan pg_fdwxact and fill FdwXact depending on the on-disk data. + * This is called once at the beginning of recovery, saving any extra + * lookups in the future. FdwXact files that are newer than the + * minimum XID horizon are discarded on the way. + */ +void +restoreFdwXactData(void) +{ + DIR *cldir; + struct dirent *clde; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + cldir = AllocateDir(FDWXACTS_DIR); + while ((clde = ReadDir(cldir, FDWXACTS_DIR)) != NULL) + { + if (strlen(clde->d_name) == FDWXACT_FILE_NAME_LEN && + strspn(clde->d_name, "0123456789ABCDEF_") == FDWXACT_FILE_NAME_LEN) + { + TransactionId local_xid; + Oid dbid; + Oid serverid; + Oid userid; + char *buf; + + sscanf(clde->d_name, "%08x_%08x_%08x_%08x", + &dbid, &local_xid, &serverid, &userid); + + /* Read fdwxact data from disk */ + buf = ProcessFdwXactBuffer(dbid, local_xid, serverid, userid, + InvalidXLogRecPtr, true); + + if (buf == NULL) + continue; + + /* Add this entry into the table of foreign transactions */ + FdwXactRedoAdd(buf, InvalidXLogRecPtr, InvalidXLogRecPtr); + } + } + + LWLockRelease(FdwXactLock); + FreeDir(cldir); +} + +/* + * Remove the foreign transaction file for given entry. + * + * If giveWarning is false, do not complain about file-not-present; + * this is an expected case during WAL replay. + */ +static void +RemoveFdwXactFile(Oid dbid, TransactionId xid, Oid serverid, Oid userid, + bool giveWarning) +{ + char path[MAXPGPATH]; + + FdwXactFilePath(path, dbid, xid, serverid, userid); + if (unlink(path) < 0 && (errno != ENOENT || giveWarning)) + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not remove foreign transaction state file \"%s\": %m", + path))); +} + +/* + * Store pointer to the start/end of the WAL record along with the xid in + * a fdwxact entry in shared memory FdwXactData structure. + */ +static void +FdwXactRedoAdd(char *buf, XLogRecPtr start_lsn, XLogRecPtr end_lsn) +{ + FdwXactOnDiskData *fdwxact_data = (FdwXactOnDiskData *) buf; + FdwXact fdwxact; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + Assert(RecoveryInProgress()); + + /* + * Add this entry into the table of foreign transactions. The + * status of the transaction is set as preparing, since we do not + * know the exact status right now. Resolver will set it later + * based on the status of local transaction which prepared this + * foreign transaction. + */ + fdwxact = insert_fdwxact(fdwxact_data->dbid, fdwxact_data->local_xid, + fdwxact_data->serverid, fdwxact_data->userid, + fdwxact_data->umid, fdwxact_data->fdwxact_id); + + elog(DEBUG2, "added fdwxact entry in shared memory for foreign transaction, db %u xid %u server %u user %u id %s", + fdwxact_data->dbid, fdwxact_data->local_xid, + fdwxact_data->serverid, fdwxact_data->userid, + fdwxact_data->fdwxact_id); + + /* + * Set status as PREPARED and as in-doubt, since we do not know + * the xact status right now. Resolver will set it later based on + * the status of local transaction that prepared this fdwxact entry. + */ + fdwxact->status = FDWXACT_STATUS_PREPARED; + fdwxact->insert_start_lsn = start_lsn; + fdwxact->insert_end_lsn = end_lsn; + fdwxact->inredo = true; /* added in redo */ + fdwxact->indoubt = true; + fdwxact->valid = false; + fdwxact->ondisk = XLogRecPtrIsInvalid(start_lsn); +} + +/* + * Remove the corresponding fdwxact entry from FdwXactCtl. Also remove + * FdwXact file if a foreign transaction was saved via an earlier checkpoint. + * We could not found the FdwXact entry in the case where a crash recovery + * starts from the point where is after added but before removed the entry. + */ +void +FdwXactRedoRemove(Oid dbid, TransactionId xid, Oid serverid, + Oid userid, bool givewarning) +{ + FdwXact fdwxact; + + Assert(LWLockHeldByMeInMode(FdwXactLock, LW_EXCLUSIVE)); + Assert(RecoveryInProgress()); + + fdwxact = get_one_fdwxact(dbid, xid, serverid, userid); + + if (fdwxact == NULL) + return; + + elog(DEBUG2, "removed fdwxact entry from shared memory for foreign transaction, db %u xid %u server %u user %u id %s", + fdwxact->dbid, fdwxact->local_xid, fdwxact->serverid, + fdwxact->userid, fdwxact->fdwxact_id); + + /* Clean up entry and any files we may have left */ + if (fdwxact->ondisk) + RemoveFdwXactFile(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + givewarning); + remove_fdwxact(fdwxact); +} + +/* + * Scan the shared memory entries of FdwXact and valid them. + * + * This is run at the end of recovery, but before we allow backends to write + * WAL. + */ +void +RecoverFdwXacts(void) +{ + int i; + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + char *buf; + + buf = ProcessFdwXactBuffer(fdwxact->dbid, fdwxact->local_xid, + fdwxact->serverid, fdwxact->userid, + fdwxact->insert_start_lsn, fdwxact->ondisk); + + if (buf == NULL) + continue; + + ereport(LOG, + (errmsg("recovering foreign transaction %u for server %u and user %u from shared memory", + fdwxact->local_xid, fdwxact->serverid, fdwxact->userid))); + + /* recovered, so reset the flag for entries generated by redo */ + fdwxact->inredo = false; + fdwxact->valid = true; + + /* + * If the foreign transaction is part of the prepared local + * transaction, it's not in in-doubt. The future COMMIT/ROLLBACK + * PREPARED can determine the fate of this foreign transaction. + */ + if (TwoPhaseExists(fdwxact->local_xid)) + { + ereport(DEBUG2, + (errmsg("clear in-doubt flag from foreign transaction %u, server %u, user %u as found the correspondinglocal prepared transaction", + fdwxact->local_xid, fdwxact->serverid, + fdwxact->userid))); + fdwxact->indoubt = false; + } + + pfree(buf); + } + LWLockRelease(FdwXactLock); +} + +bool +check_foreign_twophase_commit(int *newval, void **extra, GucSource source) +{ + ForeignTwophaseCommitLevel newForeignTwophaseCommitLevel = *newval; + + /* Parameter check */ + if (newForeignTwophaseCommitLevel > FOREIGN_TWOPHASE_COMMIT_DISABLED && + (max_prepared_foreign_xacts == 0 || max_foreign_xact_resolvers == 0)) + { + GUC_check_errdetail("Cannot enable \"foreign_twophase_commit\" when " + "\"max_prepared_foreign_transactions\" or \"max_foreign_transaction_resolvers\"" + "is zero value"); + return false; + } + + return true; +} + +/* Built in functions */ + +/* + * Structure to hold and iterate over the foreign transactions to be displayed + * by the built-in functions. + */ +typedef struct +{ + FdwXact fdwxacts; + int num_xacts; + int cur_xact; +} WorkingStatus; + +Datum +pg_foreign_xacts(PG_FUNCTION_ARGS) +{ +#define PG_PREPARED_FDWXACTS_COLS 7 + FuncCallContext *funcctx; + WorkingStatus *status; + char *xact_status; + + if (SRF_IS_FIRSTCALL()) + { + TupleDesc tupdesc; + MemoryContext oldcontext; + int num_fdwxacts = 0; + + /* create a function context for cross-call persistence */ + funcctx = SRF_FIRSTCALL_INIT(); + + /* + * Switch to memory context appropriate for multiple function calls + */ + oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); + + /* build tupdesc for result tuples */ + /* this had better match pg_fdwxacts view in system_views.sql */ + tupdesc = CreateTemplateTupleDesc(PG_PREPARED_FDWXACTS_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "dbid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "transaction", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "serverid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "userid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "status", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "indoubt", + BOOLOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "identifier", + TEXTOID, -1, 0); + + funcctx->tuple_desc = BlessTupleDesc(tupdesc); + + /* + * Collect status information that we will format and send out as a + * result set. + */ + status = (WorkingStatus *) palloc(sizeof(WorkingStatus)); + funcctx->user_fctx = (void *) status; + + status->fdwxacts = get_all_fdwxacts(&num_fdwxacts); + status->num_xacts = num_fdwxacts; + status->cur_xact = 0; + + MemoryContextSwitchTo(oldcontext); + } + + funcctx = SRF_PERCALL_SETUP(); + status = funcctx->user_fctx; + + while (status->cur_xact < status->num_xacts) + { + FdwXact fdwxact = &status->fdwxacts[status->cur_xact++]; + Datum values[PG_PREPARED_FDWXACTS_COLS]; + bool nulls[PG_PREPARED_FDWXACTS_COLS]; + HeapTuple tuple; + Datum result; + + if (!fdwxact->valid) + continue; + + /* + * Form tuple with appropriate data. + */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(fdwxact->dbid); + values[1] = TransactionIdGetDatum(fdwxact->local_xid); + values[2] = ObjectIdGetDatum(fdwxact->serverid); + values[3] = ObjectIdGetDatum(fdwxact->userid); + + switch (fdwxact->status) + { + case FDWXACT_STATUS_INITIAL: + xact_status = "initial"; + break; + case FDWXACT_STATUS_PREPARING: + xact_status = "preparing"; + break; + case FDWXACT_STATUS_PREPARED: + xact_status = "prepared"; + break; + case FDWXACT_STATUS_COMMITTING: + xact_status = "committing"; + break; + case FDWXACT_STATUS_ABORTING: + xact_status = "aborting"; + break; + case FDWXACT_STATUS_RESOLVED: + xact_status = "resolved"; + break; + default: + xact_status = "unknown"; + break; + } + values[4] = CStringGetTextDatum(xact_status); + values[5] = BoolGetDatum(fdwxact->indoubt); + values[6] = PointerGetDatum(cstring_to_text_with_len(fdwxact->fdwxact_id, + strlen(fdwxact->fdwxact_id))); + + tuple = heap_form_tuple(funcctx->tuple_desc, values, nulls); + result = HeapTupleGetDatum(tuple); + SRF_RETURN_NEXT(funcctx, result); + } + + SRF_RETURN_DONE(funcctx); +} + +/* + * Built-in function to resolve a prepared foreign transaction manually. + */ +Datum +pg_resolve_foreign_xact(PG_FUNCTION_ARGS) +{ + TransactionId xid = DatumGetTransactionId(PG_GETARG_DATUM(0)); + Oid serverid = PG_GETARG_OID(1); + Oid userid = PG_GETARG_OID(2); + ForeignServer *server; + UserMapping *usermapping; + FdwXact fdwxact; + FdwXactRslvState *state; + FdwXactStatus prev_status; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to resolve foreign transactions")))); + + server = GetForeignServer(serverid); + usermapping = GetUserMapping(userid, serverid); + state = create_fdwxact_state(); + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + fdwxact = get_one_fdwxact(MyDatabaseId, xid, serverid, userid); + + if (fdwxact == NULL) + { + LWLockRelease(FdwXactLock); + PG_RETURN_BOOL(false); + } + + state->server = server; + state->usermapping = usermapping; + state->fdwxact_id = pstrdup(fdwxact->fdwxact_id); + + SpinLockAcquire(&fdwxact->mutex); + prev_status = fdwxact->status; + SpinLockRelease(&fdwxact->mutex); + + FdwXactDetermineTransactionFate(fdwxact, false); + + LWLockRelease(FdwXactLock); + + FdwXactResolveForeignTransaction(fdwxact, state, prev_status); + + PG_RETURN_BOOL(true); +} + +/* + * Built-in function to remove a prepared foreign transaction entry without + * resolution. The function gives a way to forget about such prepared + * transaction in case: the foreign server where it is prepared is no longer + * available, the user which prepared this transaction needs to be dropped. + */ +Datum +pg_remove_foreign_xact(PG_FUNCTION_ARGS) +{ + TransactionId xid = DatumGetTransactionId(PG_GETARG_DATUM(0)); + Oid serverid = PG_GETARG_OID(1); + Oid userid = PG_GETARG_OID(2); + FdwXact fdwxact; + + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser to remove foreign transactions")))); + + LWLockAcquire(FdwXactLock, LW_EXCLUSIVE); + + fdwxact = get_one_fdwxact(MyDatabaseId, xid, serverid, userid); + + if (fdwxact == NULL) + PG_RETURN_BOOL(false); + + remove_fdwxact(fdwxact); + + LWLockRelease(FdwXactLock); + + PG_RETURN_BOOL(true); +} diff --git a/src/backend/access/fdwxact/launcher.c b/src/backend/access/fdwxact/launcher.c new file mode 100644 index 0000000000..45fb530916 --- /dev/null +++ b/src/backend/access/fdwxact/launcher.c @@ -0,0 +1,644 @@ +/*------------------------------------------------------------------------- + * + * launcher.c + * + * The foreign transaction resolver launcher process starts foreign + * transaction resolver processes. The launcher schedules resolver + * process to be started when arrived a requested by backend process. + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/fdwxact/launcher.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "pgstat.h" +#include "funcapi.h" + +#include "access/fdwxact.h" +#include "access/fdwxact_launcher.h" +#include "access/fdwxact_resolver.h" +#include "access/resolver_internal.h" +#include "commands/dbcommands.h" +#include "nodes/pg_list.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" + +/* max sleep time between cycles (3min) */ +#define DEFAULT_NAPTIME_PER_CYCLE 180000L + +static void fdwxact_launcher_onexit(int code, Datum arg); +static void fdwxact_launcher_sighup(SIGNAL_ARGS); +static void fdwxact_launch_resolver(Oid dbid); +static bool fdwxact_relaunch_resolvers(void); + +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGUSR2 = false; +FdwXactResolver *MyFdwXactResolver = NULL; + +/* + * Wake up the launcher process to retry resolution. + */ +void +FdwXactLauncherRequestToLaunchForRetry(void) +{ + if (FdwXactRslvCtl->launcher_pid != InvalidPid) + SetLatch(FdwXactRslvCtl->launcher_latch); +} + +/* + * Wake up the launcher process to request launching new resolvers + * immediately. + */ +void +FdwXactLauncherRequestToLaunch(void) +{ + if (FdwXactRslvCtl->launcher_pid != InvalidPid) + kill(FdwXactRslvCtl->launcher_pid, SIGUSR2); +} + +/* Report shared memory space needed by FdwXactRsoverShmemInit */ +Size +FdwXactRslvShmemSize(void) +{ + Size size = 0; + + size = add_size(size, SizeOfFdwXactRslvCtlData); + size = add_size(size, mul_size(max_foreign_xact_resolvers, + sizeof(FdwXactResolver))); + + return size; +} + +/* + * Allocate and initialize foreign transaction resolver shared + * memory. + */ +void +FdwXactRslvShmemInit(void) +{ + bool found; + + FdwXactRslvCtl = ShmemInitStruct("Foreign transactions resolvers", + FdwXactRslvShmemSize(), + &found); + + if (!IsUnderPostmaster) + { + int slot; + + /* First time through, so initialize */ + MemSet(FdwXactRslvCtl, 0, FdwXactRslvShmemSize()); + + SHMQueueInit(&(FdwXactRslvCtl->fdwxact_queue)); + + for (slot = 0; slot < max_foreign_xact_resolvers; slot++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[slot]; + + resolver->pid = InvalidPid; + resolver->dbid = InvalidOid; + resolver->in_use = false; + resolver->last_resolved_time = 0; + resolver->latch = NULL; + SpinLockInit(&(resolver->mutex)); + } + } +} + +/* + * Cleanup function for fdwxact launcher + * + * Called on fdwxact launcher exit. + */ +static void +fdwxact_launcher_onexit(int code, Datum arg) +{ + FdwXactRslvCtl->launcher_pid = InvalidPid; +} + +/* SIGHUP: set flag to reload configuration at next convenient time */ +static void +fdwxact_launcher_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + SetLatch(MyLatch); + + errno = save_errno; +} + +/* SIGUSR2: set flag to launch new resolver process immediately */ +static void +fdwxact_launcher_sigusr2(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGUSR2 = true; + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Main loop for the fdwxact launcher process. + */ +void +FdwXactLauncherMain(Datum main_arg) +{ + TimestampTz last_start_time = 0; + + ereport(DEBUG1, + (errmsg("fdwxact resolver launcher started"))); + + before_shmem_exit(fdwxact_launcher_onexit, (Datum) 0); + + Assert(FdwXactRslvCtl->launcher_pid == 0); + FdwXactRslvCtl->launcher_pid = MyProcPid; + FdwXactRslvCtl->launcher_latch = &MyProc->procLatch; + + pqsignal(SIGHUP, fdwxact_launcher_sighup); + pqsignal(SIGUSR2, fdwxact_launcher_sigusr2); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + BackgroundWorkerInitializeConnection(NULL, NULL, 0); + + /* Enter main loop */ + for (;;) + { + TimestampTz now; + long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + int rc; + + CHECK_FOR_INTERRUPTS(); + ResetLatch(MyLatch); + + now = GetCurrentTimestamp(); + + /* + * Limit the start retry to once a foreign_xact_resolution_retry_interval + * but always starts when the backend requested. + */ + if (got_SIGUSR2 || + TimestampDifferenceExceeds(last_start_time, now, + foreign_xact_resolution_retry_interval)) + { + MemoryContext oldctx; + MemoryContext subctx; + bool launched; + + if (got_SIGUSR2) + got_SIGUSR2 = false; + + subctx = AllocSetContextCreate(TopMemoryContext, + "Foreign Transaction Launcher", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* + * Launch foreign transaction resolvers that are requested + * but not running. + */ + launched = fdwxact_relaunch_resolvers(); + if (launched) + { + last_start_time = now; + wait_time = foreign_xact_resolution_retry_interval; + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); + } + else + { + /* + * The wait in previous cycle was interrupted in less than + * foreign_xact_resolution_retry_interval since last resolver + * started, this usually means crash of the resolver, so we + * should retry in foreign_xact_resolution_retry_interval again. + */ + wait_time = foreign_xact_resolution_retry_interval; + } + + /* Wait for more work */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wait_time, + WAIT_EVENT_FDWXACT_LAUNCHER_MAIN); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + } + + /* Not reachable */ +} + +/* + * Request launcher to launch a new foreign transaction resolver process + * or wake up the resolver if it's already running. + */ +void +FdwXactLaunchOrWakeupResolver(void) +{ + volatile FdwXactResolver *resolver; + bool found = false; + int i; + + /* + * Looking for a resolver process that is running and working on the + * same database. + */ + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + resolver = &FdwXactRslvCtl->resolvers[i]; + + if (resolver->in_use && + resolver->dbid == MyDatabaseId) + { + found = true; + break; + } + } + LWLockRelease(FdwXactResolverLock); + + if (found) + { + /* Found the running resolver */ + elog(DEBUG1, + "found a running foreign transaction resolver process for database %u", + MyDatabaseId); + + /* + * Wakeup the resolver. It's possible that the resolver is starting up + * and doesn't attach its slot yet. Since the resolver will find FdwXact + * entry we inserted soon we don't anything. + */ + if (resolver->latch) + SetLatch(resolver->latch); + + return; + } + + /* Otherwise wake up the launcher to launch new resolver */ + FdwXactLauncherRequestToLaunch(); +} + +/* + * Launch a foreign transaction resolver process that will connect to given + * 'dbid'. + */ +static void +fdwxact_launch_resolver(Oid dbid) +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + FdwXactResolver *resolver; + int unused_slot; + int i; + + LWLockAcquire(FdwXactResolverLock, LW_EXCLUSIVE); + + /* Find unused resolver slot */ + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[i]; + + if (!resolver->in_use) + { + unused_slot = i; + break; + } + } + + /* No unused found */ + if (unused_slot > max_foreign_xact_resolvers) + ereport(ERROR, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of foreign trasanction resolver slots"), + errhint("You might need to increase max_foreign_transaction_resolvers."))); + + resolver = &FdwXactRslvCtl->resolvers[unused_slot]; + resolver->in_use = true; + resolver->dbid = dbid; + LWLockRelease(FdwXactResolverLock); + + /* Register the new dynamic worker */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FdwXactResolverMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "foreign transaction resolver for database %u", resolver->dbid); + snprintf(bgw.bgw_type, BGW_MAXLEN, "foreign transaction resolver"); + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + bgw.bgw_main_arg = Int32GetDatum(unused_slot); + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to launch, cleanup the worker slot */ + SpinLockAcquire(&(MyFdwXactResolver->mutex)); + resolver->in_use = false; + SpinLockRelease(&(MyFdwXactResolver->mutex)); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase max_worker_processes."))); + } + + /* + * We don't need to wait until it attaches here because we're going to wait + * until all foreign transactions are resolved. + */ +} + +/* + * Launch or relaunch foreign transaction resolvers on database that has + * at least one FdwXact entry but no resolvers are running on it. + */ +static bool +fdwxact_relaunch_resolvers(void) +{ + HTAB *resolver_dbs; /* DBs resolver's running on */ + HTAB *fdwxact_dbs; /* DBs having at least one FdwXact entry */ + HASHCTL ctl; + HASH_SEQ_STATUS status; + Oid *entry; + bool launched; + int i; + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(Oid); + resolver_dbs = hash_create("resolver dblist", + 32, &ctl, HASH_ELEM | HASH_BLOBS); + fdwxact_dbs = hash_create("fdwxact dblist", + 32, &ctl, HASH_ELEM | HASH_BLOBS); + + /* Collect database oids that has at least one non-in-doubt FdwXact entry */ + LWLockAcquire(FdwXactLock, LW_SHARED); + for (i = 0; i < FdwXactCtl->num_fdwxacts; i++) + { + FdwXact fdwxact = FdwXactCtl->fdwxacts[i]; + + if (fdwxact->indoubt) + continue; + + hash_search(fdwxact_dbs, &(fdwxact->dbid), HASH_ENTER, NULL); + } + LWLockRelease(FdwXactLock); + + /* There is no FdwXact entry, no need to launch new one */ + if (hash_get_num_entries(fdwxact_dbs) == 0) + return false; + + /* Collect database oids on which resolvers are running */ + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[i]; + + if (!resolver->in_use) + continue; + + hash_search(resolver_dbs, &(resolver->dbid), HASH_ENTER, NULL); + } + LWLockRelease(FdwXactResolverLock); + + /* Find DBs on which no resolvers are running and launch new one on them */ + hash_seq_init(&status, fdwxact_dbs); + while ((entry = (Oid *) hash_seq_search(&status)) != NULL) + { + bool found; + + hash_search(resolver_dbs, entry, HASH_FIND, &found); + + if (!found) + { + /* No resolver is running on this database, launch new one */ + fdwxact_launch_resolver(*entry); + launched = true; + } + } + + return launched; +} + +/* + * FdwXactLauncherRegister + * Register a background worker running the foreign transaction + * launcher. + */ +void +FdwXactLauncherRegister(void) +{ + BackgroundWorker bgw; + + if (max_foreign_xact_resolvers == 0) + return; + + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "FdwXactLauncherMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "foreign transaction launcher"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "foreign transaction launcher"); + bgw.bgw_restart_time = 5; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} + +bool +IsFdwXactLauncher(void) +{ + return FdwXactRslvCtl->launcher_pid == MyProcPid; +} + +/* + * Stop the fdwxact resolver running on the given database. + */ +Datum +pg_stop_foreign_xact_resolver(PG_FUNCTION_ARGS) +{ + Oid dbid = PG_GETARG_OID(0); + FdwXactResolver *resolver = NULL; + int i; + + /* Must be super user */ + if (!superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("permission denied to stop foreign transaction resolver"))); + + if (!OidIsValid(dbid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid database id"))); + + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + + /* Find the running resolver process on the given database */ + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + resolver = &FdwXactRslvCtl->resolvers[i]; + + /* found! */ + if (resolver->in_use && resolver->dbid == dbid) + break; + } + + if (i >= max_foreign_xact_resolvers) + ereport(ERROR, + (errmsg("there is no running foreign trasaction resolver process on database %d", + dbid))); + + /* Found the resolver, terminate it ... */ + kill(resolver->pid, SIGTERM); + + /* ... and wait for it to die */ + for (;;) + { + int rc; + + /* is it gone? */ + if (!resolver->in_use) + break; + + LWLockRelease(FdwXactResolverLock); + + /* Wait a bit --- we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + LWLockAcquire(FdwXactResolverLock, LW_SHARED); + } + + LWLockRelease(FdwXactResolverLock); + + PG_RETURN_BOOL(true); +} + +/* + * Returns activity of all foreign transaction resolvers. + */ +Datum +pg_stat_get_foreign_xact(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_FDWXACT_RESOLVERS_COLS 3 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int i; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + for (i = 0; i < max_foreign_xact_resolvers; i++) + { + FdwXactResolver *resolver = &FdwXactRslvCtl->resolvers[i]; + pid_t pid; + Oid dbid; + TimestampTz last_resolved_time; + Datum values[PG_STAT_GET_FDWXACT_RESOLVERS_COLS]; + bool nulls[PG_STAT_GET_FDWXACT_RESOLVERS_COLS]; + + + SpinLockAcquire(&(resolver->mutex)); + if (resolver->pid == InvalidPid) + { + SpinLockRelease(&(resolver->mutex)); + continue; + } + + pid = resolver->pid; + dbid = resolver->dbid; + last_resolved_time = resolver->last_resolved_time; + SpinLockRelease(&(resolver->mutex)); + + memset(nulls, 0, sizeof(nulls)); + /* pid */ + values[0] = Int32GetDatum(pid); + + /* dbid */ + values[1] = ObjectIdGetDatum(dbid); + + /* last_resolved_time */ + if (last_resolved_time == 0) + nulls[2] = true; + else + values[2] = TimestampTzGetDatum(last_resolved_time); + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} diff --git a/src/backend/access/fdwxact/resolver.c b/src/backend/access/fdwxact/resolver.c new file mode 100644 index 0000000000..9298877f10 --- /dev/null +++ b/src/backend/access/fdwxact/resolver.c @@ -0,0 +1,344 @@ +/*------------------------------------------------------------------------- + * + * resolver.c + * + * The foreign transaction resolver background worker resolves foreign + * transactions that participate to a distributed transaction. A resolver + * process is started by foreign transaction launcher for each databases. + * + * A resolver process continues to resolve foreign transactions on the + * database, which the backend process is waiting for resolution. + * + * Normal termination is by SIGTERM, which instructs the resolver process + * to exit(0) at the next convenient moment. Emergency termination is by + * SIGQUIT; like any backend. The resolver process also terminate by timeouts + * only if there is no pending foreign transactions on the database waiting + * to be resolved. + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/fdwxact/resolver.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include <signal.h> +#include <unistd.h> + +#include "access/fdwxact.h" +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" +#include "access/resolver_internal.h" +#include "access/transam.h" +#include "access/xact.h" +#include "commands/dbcommands.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/ipc.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/timeout.h" +#include "utils/timestamp.h" + +/* max sleep time between cycles (3min) */ +#define DEFAULT_NAPTIME_PER_CYCLE 180000L + +/* GUC parameters */ +int foreign_xact_resolution_retry_interval; +int foreign_xact_resolver_timeout = 60 * 1000; +bool foreign_xact_resolve_indoubt_xacts; + +FdwXactRslvCtlData *FdwXactRslvCtl; + +static void FXRslvLoop(void); +static long FXRslvComputeSleepTime(TimestampTz now, TimestampTz targetTime); +static void FXRslvCheckTimeout(TimestampTz now); + +static void fdwxact_resolver_sighup(SIGNAL_ARGS); +static void fdwxact_resolver_onexit(int code, Datum arg); +static void fdwxact_resolver_detach(void); +static void fdwxact_resolver_attach(int slot); + +/* Flags set by signal handlers */ +static volatile sig_atomic_t got_SIGHUP = false; + +/* Set flag to reload configuration at next convenient time */ +static void +fdwxact_resolver_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGHUP = true; + + SetLatch(MyLatch); + + errno = save_errno; +} + +/* + * Detach the resolver and cleanup the resolver info. + */ +static void +fdwxact_resolver_detach(void) +{ + /* Block concurrent access */ + LWLockAcquire(FdwXactResolverLock, LW_EXCLUSIVE); + + MyFdwXactResolver->pid = InvalidPid; + MyFdwXactResolver->in_use = false; + MyFdwXactResolver->dbid = InvalidOid; + + LWLockRelease(FdwXactResolverLock); +} + +/* + * Cleanup up foreign transaction resolver info. + */ +static void +fdwxact_resolver_onexit(int code, Datum arg) +{ + fdwxact_resolver_detach(); + + FdwXactLauncherRequestToLaunch(); +} + +/* + * Attach to a slot. + */ +static void +fdwxact_resolver_attach(int slot) +{ + /* Block concurrent access */ + LWLockAcquire(FdwXactResolverLock, LW_EXCLUSIVE); + + Assert(slot >= 0 && slot < max_foreign_xact_resolvers); + MyFdwXactResolver = &FdwXactRslvCtl->resolvers[slot]; + + if (!MyFdwXactResolver->in_use) + { + LWLockRelease(FdwXactResolverLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("foreign transaction resolver slot %d is empty, cannot attach", + slot))); + } + + Assert(OidIsValid(MyFdwXactResolver->dbid)); + + MyFdwXactResolver->pid = MyProcPid; + MyFdwXactResolver->latch = &MyProc->procLatch; + MyFdwXactResolver->last_resolved_time = 0; + + before_shmem_exit(fdwxact_resolver_onexit, (Datum) 0); + + LWLockRelease(FdwXactResolverLock); +} + +/* Foreign transaction resolver entry point */ +void +FdwXactResolverMain(Datum main_arg) +{ + int slot = DatumGetInt32(main_arg); + + /* Attach to a slot */ + fdwxact_resolver_attach(slot); + + /* Establish signal handlers */ + pqsignal(SIGHUP, fdwxact_resolver_sighup); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Connect to our database */ + BackgroundWorkerInitializeConnectionByOid(MyFdwXactResolver->dbid, InvalidOid, 0); + + StartTransactionCommand(); + + ereport(LOG, + (errmsg("foreign transaction resolver for database \"%s\" has started", + get_database_name(MyFdwXactResolver->dbid)))); + + CommitTransactionCommand(); + + /* Initialize stats to a sanish value */ + MyFdwXactResolver->last_resolved_time = GetCurrentTimestamp(); + + /* Run the main loop */ + FXRslvLoop(); + + proc_exit(0); +} + +/* + * Fdwxact resolver main loop + */ +static void +FXRslvLoop(void) +{ + MemoryContext resolver_ctx; + + resolver_ctx = AllocSetContextCreate(TopMemoryContext, + "Foreign Transaction Resolver", + ALLOCSET_DEFAULT_SIZES); + + /* Enter main loop */ + for (;;) + { + PGPROC *waiter = NULL; + TransactionId waitXid = InvalidTransactionId; + TimestampTz resolutionTs = -1; + int rc; + TimestampTz now; + long sleep_time = DEFAULT_NAPTIME_PER_CYCLE; + + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + MemoryContextSwitchTo(resolver_ctx); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + now = GetCurrentTimestamp(); + + /* + * Process waiter until either the queue gets empty or got the waiter + * that has future resolution time. + */ + while ((waiter = FdwXactGetWaiter(&resolutionTs, &waitXid)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + Assert(TransactionIdIsValid(waitXid)); + + if (resolutionTs > now) + break; + + elog(DEBUG2, "resolver got one waiter with xid %u", waitXid); + + /* Resolve the waiting distributed transaction */ + StartTransactionCommand(); + FdwXactResolveTransactionAndReleaseWaiter(MyDatabaseId, waitXid, + waiter); + CommitTransactionCommand(); + + /* Update my stats */ + SpinLockAcquire(&(MyFdwXactResolver->mutex)); + MyFdwXactResolver->last_resolved_time = GetCurrentTimestamp(); + SpinLockRelease(&(MyFdwXactResolver->mutex)); + } + + FXRslvCheckTimeout(now); + + sleep_time = FXRslvComputeSleepTime(now, resolutionTs); + + MemoryContextResetAndDeleteChildren(resolver_ctx); + MemoryContextSwitchTo(TopMemoryContext); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + sleep_time, + WAIT_EVENT_FDWXACT_RESOLVER_MAIN); + + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Check whether there have been foreign transactions by the backend within + * foreign_xact_resolver_timeout and shutdown if not. + */ +static void +FXRslvCheckTimeout(TimestampTz now) +{ + TimestampTz last_resolved_time; + TimestampTz timeout; + + if (foreign_xact_resolver_timeout == 0) + return; + + last_resolved_time = MyFdwXactResolver->last_resolved_time; + timeout = TimestampTzPlusMilliseconds(last_resolved_time, + foreign_xact_resolver_timeout); + + if (now < timeout) + return; + + LWLockAcquire(FdwXactResolutionLock, LW_SHARED); + if (!FdwXactWaiterExists(MyDatabaseId)) + { + StartTransactionCommand(); + ereport(LOG, + (errmsg("foreign transaction resolver for database \"%s\" will stop because the timeout", + get_database_name(MyDatabaseId)))); + CommitTransactionCommand(); + + /* + * Keep holding FdwXactResolutionLock until detached the slot. It is + * necessary to prevent a race condition; a waiter enqueues after + * checked FdwXactWaiterExists. + */ + fdwxact_resolver_detach(); + LWLockRelease(FdwXactResolutionLock); + proc_exit(0); + } + else + elog(DEBUG2, "resolver reached to the timeout but don't exist as the queue is not empty"); + + LWLockRelease(FdwXactResolutionLock); +} + +/* + * Compute how long we should sleep by the next cycle. We can sleep until the time + * out or the next resolution time given by nextResolutionTs. + */ +static long +FXRslvComputeSleepTime(TimestampTz now, TimestampTz nextResolutionTs) +{ + long sleeptime = DEFAULT_NAPTIME_PER_CYCLE; + + if (foreign_xact_resolver_timeout > 0) + { + TimestampTz timeout; + long sec_to_timeout; + int microsec_to_timeout; + + /* Compute relative time until wakeup. */ + timeout = TimestampTzPlusMilliseconds(MyFdwXactResolver->last_resolved_time, + foreign_xact_resolver_timeout); + TimestampDifference(now, timeout, + &sec_to_timeout, µsec_to_timeout); + + sleeptime = Min(sleeptime, + sec_to_timeout * 1000 + microsec_to_timeout / 1000); + } + + if (nextResolutionTs > 0) + { + long sec_to_timeout; + int microsec_to_timeout; + + TimestampDifference(now, nextResolutionTs, + &sec_to_timeout, µsec_to_timeout); + + sleeptime = Min(sleeptime, + sec_to_timeout * 1000 + microsec_to_timeout / 1000); + } + + return sleeptime; +} + +bool +IsFdwXactResolver(void) +{ + return MyFdwXactResolver != NULL; +} diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index f88d72fd86..982c1a36cc 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -13,6 +13,7 @@ OBJS = \ clogdesc.o \ committsdesc.o \ dbasedesc.o \ + fdwxactdesc.o \ genericdesc.o \ gindesc.o \ gistdesc.o \ diff --git a/src/backend/access/rmgrdesc/fdwxactdesc.c b/src/backend/access/rmgrdesc/fdwxactdesc.c new file mode 100644 index 0000000000..fe0cef9472 --- /dev/null +++ b/src/backend/access/rmgrdesc/fdwxactdesc.c @@ -0,0 +1,58 @@ +/*------------------------------------------------------------------------- + * + * fdwxactdesc.c + * PostgreSQL global transaction manager for foreign server. + * + * This module describes the WAL records for foreign transaction manager. + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * src/backend/access/rmgrdesc/fdwxactdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdwxact_xlog.h" + +void +fdwxact_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_FDWXACT_INSERT) + { + FdwXactOnDiskData *fdwxact_insert = (FdwXactOnDiskData *) rec; + + appendStringInfo(buf, "server: %u,", fdwxact_insert->serverid); + appendStringInfo(buf, " user: %u,", fdwxact_insert->userid); + appendStringInfo(buf, " database: %u,", fdwxact_insert->dbid); + appendStringInfo(buf, " local xid: %u,", fdwxact_insert->local_xid); + appendStringInfo(buf, " id: %s", fdwxact_insert->fdwxact_id); + } + else + { + xl_fdwxact_remove *fdwxact_remove = (xl_fdwxact_remove *) rec; + + appendStringInfo(buf, "server: %u,", fdwxact_remove->serverid); + appendStringInfo(buf, " user: %u,", fdwxact_remove->userid); + appendStringInfo(buf, " database: %u,", fdwxact_remove->dbid); + appendStringInfo(buf, " local xid: %u", fdwxact_remove->xid); + } + +} + +const char * +fdwxact_identify(uint8 info) +{ + switch (info & ~XLR_INFO_MASK) + { + case XLOG_FDWXACT_INSERT: + return "NEW FOREIGN TRANSACTION"; + case XLOG_FDWXACT_REMOVE: + return "REMOVE FOREIGN TRANSACTION"; + } + /* Keep compiler happy */ + return NULL; +} diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c index 33060f3042..1d4e1c82e1 100644 --- a/src/backend/access/rmgrdesc/xlogdesc.c +++ b/src/backend/access/rmgrdesc/xlogdesc.c @@ -114,7 +114,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) appendStringInfo(buf, "max_connections=%d max_worker_processes=%d " "max_wal_senders=%d max_prepared_xacts=%d " "max_locks_per_xact=%d wal_level=%s " - "wal_log_hints=%s track_commit_timestamp=%s", + "wal_log_hints=%s track_commit_timestamp=%s " + "max_prepared_foreign_transactions=%d", xlrec.MaxConnections, xlrec.max_worker_processes, xlrec.max_wal_senders, @@ -122,7 +123,8 @@ xlog_desc(StringInfo buf, XLogReaderState *record) xlrec.max_locks_per_xact, wal_level_str, xlrec.wal_log_hints ? "on" : "off", - xlrec.track_commit_timestamp ? "on" : "off"); + xlrec.track_commit_timestamp ? "on" : "off", + xlrec.max_prepared_foreign_xacts); } else if (info == XLOG_FPW_CHANGE) { diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 58091f6b52..200cf9d067 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -10,6 +10,7 @@ #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/generic_xlog.h" #include "access/ginxlog.h" #include "access/gistxlog.h" diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 529976885f..2c9af36bbb 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -77,6 +77,7 @@ #include <unistd.h> #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/subtrans.h" #include "access/transam.h" @@ -850,6 +851,35 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held) return result; } +/* + * TwoPhaseExists + * Return true if there is a prepared transaction specified by XID + */ +bool +TwoPhaseExists(TransactionId xid) +{ + int i; + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + + for (i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + PGXACT *pgxact = &ProcGlobal->allPgXact[gxact->pgprocno]; + + if (pgxact->xid == xid) + { + found = true; + break; + } + } + + LWLockRelease(TwoPhaseStateLock); + + return found; +} + /* * TwoPhaseGetDummyBackendId * Get the dummy backend ID for prepared transaction specified by XID @@ -2262,6 +2292,12 @@ RecordTransactionCommitPrepared(TransactionId xid, * in the procarray and continue to hold locks. */ SyncRepWaitForLSN(recptr, true); + + /* + * Wait for foreign transaction prepared as part of this prepared + * transaction to be committed. + */ + FdwXactWaitToBeResolved(xid, true); } /* @@ -2321,6 +2357,12 @@ RecordTransactionAbortPrepared(TransactionId xid, * in the procarray and continue to hold locks. */ SyncRepWaitForLSN(recptr, false); + + /* + * Wait for foreign transaction prepared as part of this prepared + * transaction to be committed. + */ + FdwXactWaitToBeResolved(xid, false); } /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 5353b6ab0b..5b67056c65 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include <unistd.h> #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -1218,6 +1219,7 @@ RecordTransactionCommit(void) SharedInvalidationMessage *invalMessages = NULL; bool RelcacheInitFileInval = false; bool wrote_xlog; + bool need_commit_globally; /* Get data needed for commit record */ nrels = smgrGetPendingDeletes(true, &rels); @@ -1226,6 +1228,7 @@ RecordTransactionCommit(void) nmsgs = xactGetCommittedInvalidationMessages(&invalMessages, &RelcacheInitFileInval); wrote_xlog = (XactLastRecEnd != 0); + need_commit_globally = FdwXactIsForeignTwophaseCommitRequired(); /* * If we haven't been assigned an XID yet, we neither can, nor do we want @@ -1264,12 +1267,13 @@ RecordTransactionCommit(void) } /* - * If we didn't create XLOG entries, we're done here; otherwise we - * should trigger flushing those entries the same as a commit record + * If we didn't create XLOG entries and the transaction does not need + * to be committed using two-phase commit. we're done here; otherwise + * we should trigger flushing those entries the same as a commit record * would. This will primarily happen for HOT pruning and the like; we * want these to be flushed to disk in due time. */ - if (!wrote_xlog) + if (!wrote_xlog && !need_commit_globally) goto cleanup; } else @@ -1427,6 +1431,14 @@ RecordTransactionCommit(void) if (wrote_xlog && markXidCommitted) SyncRepWaitForLSN(XactLastRecEnd, true); + /* + * Wait for prepared foreign transaction to be resolved, if required. + * We only want to wait if we prepared foreign transaction in this + * transaction. + */ + if (need_commit_globally && markXidCommitted) + FdwXactWaitToBeResolved(xid, true); + /* remember end of last commit record */ XactLastCommitEnd = XactLastRecEnd; @@ -2086,6 +2098,10 @@ CommitTransaction(void) break; } + + /* Pre-commit step for foreign transactions */ + PreCommit_FdwXacts(); + CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); @@ -2246,6 +2262,7 @@ CommitTransaction(void) AtEOXact_PgStat(true, is_parallel_worker); AtEOXact_Snapshot(true, false); AtEOXact_ApplyLauncher(true); + AtEOXact_FdwXacts(true); pgstat_report_xact_timestamp(0); CurrentResourceOwner = NULL; @@ -2333,6 +2350,8 @@ PrepareTransaction(void) * the transaction-abort path. */ + AtPrepare_FdwXacts(); + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2527,6 +2546,7 @@ PrepareTransaction(void) AtEOXact_Files(true); AtEOXact_ComboCid(); AtEOXact_HashTables(true); + AtEOXact_FdwXacts(true); /* don't call AtEOXact_PgStat here; we fixed pgstat state above */ AtEOXact_Snapshot(true, true); pgstat_report_xact_timestamp(0); @@ -2732,6 +2752,7 @@ AbortTransaction(void) AtEOXact_HashTables(false); AtEOXact_PgStat(false, is_parallel_worker); AtEOXact_ApplyLauncher(false); + AtEOXact_FdwXacts(false); pgstat_report_xact_timestamp(0); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6bc1a6b46d..428a974c51 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -24,6 +24,7 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/heaptoast.h" #include "access/multixact.h" #include "access/rewriteheap.h" @@ -5246,6 +5247,7 @@ BootStrapXLOG(void) ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_wal_senders = max_wal_senders; ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = max_locks_per_xact; ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; @@ -6189,6 +6191,9 @@ CheckRequiredParameterValues(void) RecoveryRequiresIntParameter("max_wal_senders", max_wal_senders, ControlFile->max_wal_senders); + RecoveryRequiresIntParameter("max_prepared_foreign_transactions", + max_prepared_foreign_xacts, + ControlFile->max_prepared_foreign_xacts); RecoveryRequiresIntParameter("max_prepared_transactions", max_prepared_xacts, ControlFile->max_prepared_xacts); @@ -6729,14 +6734,15 @@ StartupXLOG(void) restoreTimeLineHistoryFiles(ThisTimeLineID, recoveryTargetTLI); /* - * Before running in recovery, scan pg_twophase and fill in its status to - * be able to work on entries generated by redo. Doing a scan before - * taking any recovery action has the merit to discard any 2PC files that - * are newer than the first record to replay, saving from any conflicts at - * replay. This avoids as well any subsequent scans when doing recovery - * of the on-disk two-phase data. + * Before running in recovery, scan pg_twophase and pg_fdwxacts, and then + * fill in its status to be able to work on entries generated by redo. + * Doing a scan before taking any recovery action has the merit to discard + * any state files that are newer than the first record to replay, saving + * from any conflicts at replay. This avoids as well any subsequent scans + * when doing recovery of the on-disk two-phase or fdwxact data. */ restoreTwoPhaseData(); + restoreFdwXactData(); lastFullPageWrites = checkPoint.fullPageWrites; @@ -6928,7 +6934,10 @@ StartupXLOG(void) InitRecoveryTransactionEnvironment(); if (wasShutdown) + { oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFdwXacts(oldestActiveXID); + } else oldestActiveXID = checkPoint.oldestActiveXid; Assert(TransactionIdIsValid(oldestActiveXID)); @@ -7424,6 +7433,7 @@ StartupXLOG(void) * as potential problems are detected before any on-disk change is done. */ oldestActiveXID = PrescanPreparedTransactions(NULL, NULL); + oldestActiveXID = PrescanFdwXacts(oldestActiveXID); /* * Consider whether we need to assign a new timeline ID. @@ -7754,6 +7764,9 @@ StartupXLOG(void) /* Reload shared-memory state for prepared transactions */ RecoverPreparedTransactions(); + /* Load all foreign transaction entries from disk to memory */ + RecoverFdwXacts(); + /* * Shutdown the recovery environment. This must occur after * RecoverPreparedTransactions(), see notes for lock_twophase_recover() @@ -9029,6 +9042,7 @@ CheckPointGuts(XLogRecPtr checkPointRedo, int flags) CheckPointReplicationOrigin(); /* We deliberately delay 2PC checkpointing as long as possible */ CheckPointTwoPhase(checkPointRedo); + CheckPointFdwXacts(checkPointRedo); } /* @@ -9462,8 +9476,10 @@ XLogReportParameters(void) max_worker_processes != ControlFile->max_worker_processes || max_wal_senders != ControlFile->max_wal_senders || max_prepared_xacts != ControlFile->max_prepared_xacts || + max_prepared_foreign_xacts != ControlFile->max_prepared_foreign_xacts || max_locks_per_xact != ControlFile->max_locks_per_xact || - track_commit_timestamp != ControlFile->track_commit_timestamp) + track_commit_timestamp != ControlFile->track_commit_timestamp || + max_prepared_foreign_xacts != ControlFile->max_prepared_foreign_xacts) { /* * The change in number of backend slots doesn't need to be WAL-logged @@ -9481,6 +9497,7 @@ XLogReportParameters(void) xlrec.max_worker_processes = max_worker_processes; xlrec.max_wal_senders = max_wal_senders; xlrec.max_prepared_xacts = max_prepared_xacts; + xlrec.max_prepared_foreign_xacts = max_prepared_foreign_xacts; xlrec.max_locks_per_xact = max_locks_per_xact; xlrec.wal_level = wal_level; xlrec.wal_log_hints = wal_log_hints; @@ -9497,6 +9514,7 @@ XLogReportParameters(void) ControlFile->max_worker_processes = max_worker_processes; ControlFile->max_wal_senders = max_wal_senders; ControlFile->max_prepared_xacts = max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = max_locks_per_xact; ControlFile->wal_level = wal_level; ControlFile->wal_log_hints = wal_log_hints; @@ -9702,6 +9720,7 @@ xlog_redo(XLogReaderState *record) RunningTransactionsData running; oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); + oldestActiveXID = PrescanFdwXacts(oldestActiveXID); /* * Construct a RunningTransactions snapshot representing a shut @@ -9901,6 +9920,7 @@ xlog_redo(XLogReaderState *record) ControlFile->max_worker_processes = xlrec.max_worker_processes; ControlFile->max_wal_senders = xlrec.max_wal_senders; ControlFile->max_prepared_xacts = xlrec.max_prepared_xacts; + ControlFile->max_prepared_foreign_xacts = xlrec.max_prepared_foreign_xacts; ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_log_hints = xlrec.wal_log_hints; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f7800f01a6..b4c1cce1f0 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -332,6 +332,9 @@ CREATE VIEW pg_prepared_xacts AS CREATE VIEW pg_prepared_statements AS SELECT * FROM pg_prepared_statement() AS P; +CREATE VIEW pg_foreign_xacts AS + SELECT * FROM pg_foreign_xacts() AS F; + CREATE VIEW pg_seclabels AS SELECT l.objoid, l.classoid, l.objsubid, @@ -818,6 +821,14 @@ CREATE VIEW pg_stat_subscription AS LEFT JOIN pg_stat_get_subscription(NULL) st ON (st.subid = su.oid); +CREATE VIEW pg_stat_foreign_xact AS + SELECT + r.pid, + r.dbid, + r.last_resolved_time + FROM pg_stat_get_foreign_xact() r + WHERE r.pid IS NOT NULL; + CREATE VIEW pg_stat_ssl AS SELECT S.pid, diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 42a147b67d..e3caef7ef9 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2857,8 +2857,14 @@ CopyFrom(CopyState cstate) if (resultRelInfo->ri_FdwRoutine != NULL && resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) + { + /* Remember the transaction modifies data on a foreign server*/ + RegisterFdwXactByRelId(RelationGetRelid(resultRelInfo->ri_RelationDesc), + true); + resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, resultRelInfo); + } /* Prepare to catch AFTER triggers. */ AfterTriggerBeginQuery(); diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c index 766c9f95c8..43bbe8356d 100644 --- a/src/backend/commands/foreigncmds.c +++ b/src/backend/commands/foreigncmds.c @@ -13,6 +13,8 @@ */ #include "postgres.h" +#include "access/fdwxact.h" +#include "access/heapam.h" #include "access/htup_details.h" #include "access/reloptions.h" #include "access/table.h" @@ -1101,6 +1103,18 @@ RemoveForeignServerById(Oid srvId) if (!HeapTupleIsValid(tp)) elog(ERROR, "cache lookup failed for foreign server %u", srvId); + /* + * If there is a foreign prepared transaction with this foreign server, + * dropping it might result in dangling prepared transaction. + */ + if (fdwxact_exists(MyDatabaseId, srvId, InvalidOid)) + { + Form_pg_foreign_server srvForm = (Form_pg_foreign_server) GETSTRUCT(tp); + ereport(WARNING, + (errmsg("server \"%s\" has unresolved prepared transactions on it", + NameStr(srvForm->srvname)))); + } + CatalogTupleDelete(rel, &tp->t_self); ReleaseSysCache(tp); @@ -1419,6 +1433,15 @@ RemoveUserMapping(DropUserMappingStmt *stmt) user_mapping_ddl_aclcheck(useId, srv->serverid, srv->servername); + /* + * If there is a foreign prepared transaction with this user mapping, + * dropping it might result in dangling prepared transaction. + */ + if (fdwxact_exists(MyDatabaseId, srv->serverid, useId)) + ereport(WARNING, + (errmsg("server \"%s\" has unresolved prepared transaction for user \"%s\"", + srv->servername, MappingUserName(useId)))); + /* * Do the deletion */ @@ -1572,6 +1595,13 @@ ImportForeignSchema(ImportForeignSchemaStmt *stmt) errmsg("foreign-data wrapper \"%s\" does not support IMPORT FOREIGN SCHEMA", fdw->fdwname))); + /* + * Remember the transaction accesses to a foreign server. Normally during + * ImportForeignSchema we don't modify data on foreign servers, so remember it + * as not-modified server. + */ + RegisterFdwXactByServerId(server->serverid, false); + /* Call FDW to get a list of commands */ cmd_list = fdw_routine->ImportForeignSchema(stmt, server->serverid); diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index d23f292cb0..690717c34e 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -13,6 +13,7 @@ */ #include "postgres.h" +#include "access/fdwxact.h" #include "access/table.h" #include "access/tableam.h" #include "catalog/partition.h" @@ -944,7 +945,14 @@ ExecInitRoutingInfo(ModifyTableState *mtstate, */ if (partRelInfo->ri_FdwRoutine != NULL && partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL) + { + Relation child = partRelInfo->ri_RelationDesc; + + /* Remember the transaction modifies data on a foreign server*/ + RegisterFdwXactByRelId(RelationGetRelid(child), true); + partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo); + } partRelInfo->ri_PartitionInfo = partrouteinfo; partRelInfo->ri_CopyMultiInsertBuffer = NULL; diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 52af1dac5c..3ac56d1678 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -22,6 +22,8 @@ */ #include "postgres.h" +#include "access/fdwxact.h" +#include "access/xact.h" #include "executor/executor.h" #include "executor/nodeForeignscan.h" #include "foreign/fdwapi.h" @@ -224,9 +226,31 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) * Tell the FDW to initialize the scan. */ if (node->operation != CMD_SELECT) + { + RangeTblEntry *rte; + + rte = exec_rt_fetch(estate->es_result_relation_info->ri_RangeTableIndex, + estate); + + /* Remember the transaction modifies data on a foreign server*/ + RegisterFdwXactByRelId(rte->relid, true); + fdwroutine->BeginDirectModify(scanstate, eflags); + } else + { + RangeTblEntry *rte; + int rtindex = (scanrelid > 0) ? + scanrelid : + bms_next_member(node->fs_relids, -1); + + rte = exec_rt_fetch(rtindex, estate); + + /* Remember the transaction accesses to a foreign server */ + RegisterFdwXactByRelId(rte->relid, false); + fdwroutine->BeginForeignScan(scanstate, eflags); + } return scanstate; } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index cd91f9c8a8..c1ab3d829a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -37,6 +37,7 @@ #include "postgres.h" +#include "access/fdwxact.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/tableam.h" @@ -47,6 +48,7 @@ #include "executor/executor.h" #include "executor/nodeModifyTable.h" #include "foreign/fdwapi.h" +#include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" #include "rewrite/rewriteHandler.h" @@ -549,6 +551,10 @@ ExecInsert(ModifyTableState *mtstate, NULL, specToken); + /* Make note that we've wrote on non-temprary relation */ + if (RelationNeedsWAL(resultRelationDesc)) + MyXactFlags |= XACT_FLAGS_WROTENONTEMPREL; + /* insert index entries for tuple */ recheckIndexes = ExecInsertIndexTuples(slot, estate, true, &specConflict, @@ -777,6 +783,10 @@ ldelete:; &tmfd, changingPart); + /* Make note that we've wrote on non-temprary relation */ + if (RelationNeedsWAL(resultRelationDesc)) + MyXactFlags |= XACT_FLAGS_WROTENONTEMPREL; + switch (result) { case TM_SelfModified: @@ -1323,6 +1333,10 @@ lreplace:; true /* wait for commit */ , &tmfd, &lockmode, &update_indexes); + /* Make note that we've wrote on non-temprary relation */ + if (RelationNeedsWAL(resultRelationDesc)) + MyXactFlags |= XACT_FLAGS_WROTENONTEMPREL; + switch (result) { case TM_SelfModified: @@ -2382,6 +2396,10 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) resultRelInfo->ri_FdwRoutine->BeginForeignModify != NULL) { List *fdw_private = (List *) list_nth(node->fdwPrivLists, i); + Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc); + + /* Remember the transaction modifies data on a foreign server*/ + RegisterFdwXactByRelId(relid, true); resultRelInfo->ri_FdwRoutine->BeginForeignModify(mtstate, resultRelInfo, diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index c917ec40ff..0b17505aac 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -187,6 +187,49 @@ GetForeignServerByName(const char *srvname, bool missing_ok) return GetForeignServer(serverid); } +/* + * GetUserMappingOid - look up the user mapping by user mapping oid. + * + * If userid of the mapping is invalid, we set it to current userid. + */ +UserMapping * +GetUserMappingByOid(Oid umid) +{ + Datum datum; + HeapTuple tp; + UserMapping *um; + bool isnull; + Form_pg_user_mapping tableform; + + tp = SearchSysCache1(USERMAPPINGOID, + ObjectIdGetDatum(umid)); + + if (!HeapTupleIsValid(tp)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("user mapping not found for %d", umid))); + + tableform = (Form_pg_user_mapping) GETSTRUCT(tp); + um = (UserMapping *) palloc(sizeof(UserMapping)); + um->umid = umid; + um->userid = OidIsValid(tableform->umuser) ? + tableform->umuser : GetUserId(); + um->serverid = tableform->umserver; + + /* Extract the umoptions */ + datum = SysCacheGetAttr(USERMAPPINGUSERSERVER, + tp, + Anum_pg_user_mapping_umoptions, + &isnull); + if (isnull) + um->options = NIL; + else + um->options = untransformRelOptions(datum); + + ReleaseSysCache(tp); + + return um; +} /* * GetUserMapping - look up the user mapping. @@ -328,6 +371,20 @@ GetFdwRoutine(Oid fdwhandler) elog(ERROR, "foreign-data wrapper handler function %u did not return an FdwRoutine struct", fdwhandler); + /* Sanity check for transaction management callbacks */ + if ((routine->CommitForeignTransaction && + !routine->RollbackForeignTransaction) || + (!routine->CommitForeignTransaction && + routine->RollbackForeignTransaction)) + elog(ERROR, + "foreign-data-wrapper must support both commit and rollback routine or either"); + + if (routine->PrepareForeignTransaction && + (!routine->CommitForeignTransaction || + !routine->RollbackForeignTransaction)) + elog(ERROR, + "foreign-data wrapper that supports prepare routine must support both commit and rollback routines"); + return routine; } diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 5f8a007e73..0a8890a984 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -14,6 +14,8 @@ #include <unistd.h> +#include "access/fdwxact_launcher.h" +#include "access/fdwxact_resolver.h" #include "access/parallel.h" #include "libpq/pqsignal.h" #include "miscadmin.h" @@ -129,6 +131,12 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain + }, + { + "FdwXactResolverMain", FdwXactResolverMain + }, + { + "FdwXactLauncherMain", FdwXactLauncherMain } }; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index fabcf31de8..0d3932c2cf 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3650,6 +3650,12 @@ pgstat_get_wait_activity(WaitEventActivity w) case WAIT_EVENT_CHECKPOINTER_MAIN: event_name = "CheckpointerMain"; break; + case WAIT_EVENT_FDWXACT_RESOLVER_MAIN: + event_name = "FdwXactResolverMain"; + break; + case WAIT_EVENT_FDWXACT_LAUNCHER_MAIN: + event_name = "FdwXactLauncherMain"; + break; case WAIT_EVENT_LOGICAL_APPLY_MAIN: event_name = "LogicalApplyMain"; break; @@ -3853,6 +3859,11 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_SYNC_REP: event_name = "SyncRep"; break; + case WAIT_EVENT_FDWXACT: + event_name = "FdwXact"; + case WAIT_EVENT_FDWXACT_RESOLUTION: + event_name = "FdwXactResolution"; + break; /* no default case, so that compiler will warn */ } @@ -4068,6 +4079,15 @@ pgstat_get_wait_io(WaitEventIO w) case WAIT_EVENT_TWOPHASE_FILE_WRITE: event_name = "TwophaseFileWrite"; break; + case WAIT_EVENT_FDWXACT_FILE_WRITE: + event_name = "FdwXactFileWrite"; + break; + case WAIT_EVENT_FDWXACT_FILE_READ: + event_name = "FdwXactFileRead"; + break; + case WAIT_EVENT_FDWXACT_FILE_SYNC: + event_name = "FdwXactFileSync"; + break; case WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ: event_name = "WALSenderTimelineHistoryRead"; break; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 9ff2832c00..f92be8387d 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -93,6 +93,8 @@ #include <pthread.h> #endif +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" #include "access/transam.h" #include "access/xlog.h" #include "bootstrap/bootstrap.h" @@ -909,6 +911,10 @@ PostmasterMain(int argc, char *argv[]) ereport(ERROR, (errmsg("WAL streaming (max_wal_senders > 0) requires wal_level \"replica\" or \"logical\""))); + if (max_prepared_foreign_xacts > 0 && max_foreign_xact_resolvers == 0) + ereport(ERROR, + (errmsg("preparing foreign transactions (max_prepared_foreign_transactions > 0) requires max_foreign_transaction_resolvers> 0"))); + /* * Other one-time internal sanity checks can go here, if they are fast. * (Put any slow processing further down, after postmaster.pid creation.) @@ -984,12 +990,13 @@ PostmasterMain(int argc, char *argv[]) #endif /* - * Register the apply launcher. Since it registers a background worker, - * it needs to be called before InitializeMaxBackends(), and it's probably - * a good idea to call it before any modules had chance to take the - * background worker slots. + * Register the apply launcher and foreign transaction launcher. Since + * it registers a background worker, it needs to be called before + * InitializeMaxBackends(), and it's probably a good idea to call it + * before any modules had chance to take the background worker slots. */ ApplyLauncherRegister(); + FdwXactLauncherRegister(); /* * process any libraries that should be preloaded at postmaster start diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index bc532d027b..6269f384af 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -151,6 +151,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: case RM_GENERIC_ID: + case RM_FDWXACT_ID: /* just deal with xid, and done */ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), buf.origptr); diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 4829953ee6..6bde7a735a 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -16,6 +16,8 @@ #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact.h" +#include "access/fdwxact_launcher.h" #include "access/heapam.h" #include "access/multixact.h" #include "access/nbtree.h" @@ -147,6 +149,8 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, FdwXactShmemSize()); + size = add_size(size, FdwXactRslvShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -263,6 +267,8 @@ CreateSharedMemoryAndSemaphores(void) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + FdwXactShmemInit(); + FdwXactRslvShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 13bcbe77de..020eb76b6a 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -93,6 +93,8 @@ typedef struct ProcArrayStruct TransactionId replication_slot_xmin; /* oldest catalog xmin of any replication slot */ TransactionId replication_slot_catalog_xmin; + /* local transaction id of oldest unresolved distributed transaction */ + TransactionId fdwxact_unresolved_xmin; /* indexes into allPgXact[], has PROCARRAY_MAXPROCS entries */ int pgprocnos[FLEXIBLE_ARRAY_MEMBER]; @@ -248,6 +250,7 @@ CreateSharedProcArray(void) procArray->lastOverflowedXid = InvalidTransactionId; procArray->replication_slot_xmin = InvalidTransactionId; procArray->replication_slot_catalog_xmin = InvalidTransactionId; + procArray->fdwxact_unresolved_xmin = InvalidTransactionId; } allProcs = ProcGlobal->allProcs; @@ -1312,6 +1315,7 @@ GetOldestXmin(Relation rel, int flags) TransactionId replication_slot_xmin = InvalidTransactionId; TransactionId replication_slot_catalog_xmin = InvalidTransactionId; + TransactionId fdwxact_unresolved_xmin = InvalidTransactionId; /* * If we're not computing a relation specific limit, or if a shared @@ -1377,6 +1381,7 @@ GetOldestXmin(Relation rel, int flags) */ replication_slot_xmin = procArray->replication_slot_xmin; replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin; + fdwxact_unresolved_xmin = procArray->fdwxact_unresolved_xmin; if (RecoveryInProgress()) { @@ -1426,6 +1431,15 @@ GetOldestXmin(Relation rel, int flags) NormalTransactionIdPrecedes(replication_slot_xmin, result)) result = replication_slot_xmin; + /* + * Check whether there are unresolved distributed transaction + * requiring an older xmin. + */ + if (!(flags & PROCARRAY_FDWXACT_XMIN) && + TransactionIdIsValid(fdwxact_unresolved_xmin) && + NormalTransactionIdPrecedes(fdwxact_unresolved_xmin, result)) + result = fdwxact_unresolved_xmin; + /* * After locks have been released and vacuum_defer_cleanup_age has been * applied, check whether we need to back up further to make logical @@ -3128,6 +3142,38 @@ ProcArrayGetReplicationSlotXmin(TransactionId *xmin, LWLockRelease(ProcArrayLock); } +/* + * ProcArraySetFdwXactUnresolvedXmin + * + * Install limits to future computations fo the xmin horizon to prevent + * vacuum clog from affected transactions still needed by resolving + * distributed transaction. + */ +void +ProcArraySetFdwXactUnresolvedXmin(TransactionId xmin) +{ + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + procArray->fdwxact_unresolved_xmin = xmin; + LWLockRelease(ProcArrayLock); +} + +/* + * ProcArrayGetFdwXactUnresolvedXmin + * + * Return the current unresolved xmin limits. + */ +TransactionId +ProcArrayGetFdwXactUnresolvedXmin(void) +{ + TransactionId xmin; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + xmin = procArray->fdwxact_unresolved_xmin; + LWLockRelease(ProcArrayLock); + + return xmin; +} #define XidCacheRemove(i) \ do { \ diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index db47843229..adb276370c 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -49,3 +49,6 @@ MultiXactTruncationLock 41 OldSnapshotTimeMapLock 42 LogicalRepWorkerLock 43 CLogTruncationLock 44 +FdwXactLock 45 +FdwXactResolverLock 46 +FdwXactResolutionLock 47 diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index fff0628e58..af5e418a03 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -35,6 +35,7 @@ #include <unistd.h> #include <sys/time.h> +#include "access/fdwxact.h" #include "access/transam.h" #include "access/twophase.h" #include "access/xact.h" @@ -421,6 +422,10 @@ InitProcess(void) MyProc->syncRepState = SYNC_REP_NOT_WAITING; SHMQueueElemInit(&(MyProc->syncRepLinks)); + /* Initialize fields for fdw xact */ + MyProc->fdwXactState = FDWXACT_NOT_WAITING; + SHMQueueElemInit(&(MyProc->fdwXactLinks)); + /* Initialize fields for group XID clearing. */ MyProc->procArrayGroupMember = false; MyProc->procArrayGroupMemberXid = InvalidTransactionId; @@ -822,6 +827,9 @@ ProcKill(int code, Datum arg) /* Make sure we're out of the sync rep lists */ SyncRepCleanupAtProcExit(); + /* Make sure we're out of the fdwxact lists */ + FdwXactCleanupAtProcExit(); + #ifdef USE_ASSERT_CHECKING { int i; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 3b85e48333..a0f8498862 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -36,6 +36,8 @@ #include "rusagestub.h" #endif +#include "access/fdwxact_resolver.h" +#include "access/fdwxact_launcher.h" #include "access/parallel.h" #include "access/printtup.h" #include "access/xact.h" @@ -3029,6 +3031,18 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsFdwXactResolver()) + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating foreign transaction resolver due to administrator command"))); + else if (IsFdwXactLauncher()) + { + /* + * The foreign transaction launcher can be stopped at any time. + * Use exit status 1 so the background worker is restarted. + */ + proc_exit(1); + } else if (RecoveryConflictPending && RecoveryConflictRetryable) { pgstat_report_recovery_conflict(RecoveryConflictReason); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index ba74bf9f7d..d38c33b64c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -27,6 +27,7 @@ #endif #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/gin.h" #include "access/rmgr.h" #include "access/tableam.h" @@ -399,6 +400,25 @@ static const struct config_enum_entry synchronous_commit_options[] = { {NULL, 0, false} }; +/* + * Although only "required", "prefer", and "disabled" are documented, + * we accept all the likely variants of "on" and "off". + */ +static const struct config_enum_entry foreign_twophase_commit_options[] = { + {"required", FOREIGN_TWOPHASE_COMMIT_REQUIRED, false}, + {"prefer", FOREIGN_TWOPHASE_COMMIT_PREFER, false}, + {"disabled", FOREIGN_TWOPHASE_COMMIT_DISABLED, false}, + {"on", FOREIGN_TWOPHASE_COMMIT_REQUIRED, false}, + {"off", FOREIGN_TWOPHASE_COMMIT_DISABLED, false}, + {"true", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"false", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {"yes", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"no", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {"1", FOREIGN_TWOPHASE_COMMIT_REQUIRED, true}, + {"0", FOREIGN_TWOPHASE_COMMIT_DISABLED, true}, + {NULL, 0, false} +}; + /* * Although only "on", "off", "try" are documented, we accept all the likely * variants of "on" and "off". @@ -725,6 +745,12 @@ const char *const config_group_names[] = gettext_noop("Client Connection Defaults / Other Defaults"), /* LOCK_MANAGEMENT */ gettext_noop("Lock Management"), + /* FDWXACT */ + gettext_noop("Foreign Transaction Management"), + /* FDWXACT_SETTINGS */ + gettext_noop("Foreign Transaction Management / Settings"), + /* FDWXACT_RESOLVER */ + gettext_noop("Foreign Transaction Management / Resolver"), /* COMPAT_OPTIONS */ gettext_noop("Version and Platform Compatibility"), /* COMPAT_OPTIONS_PREVIOUS */ @@ -2370,6 +2396,52 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + /* + * See also CheckRequiredParameterValues() if this parameter changes + */ + { + {"max_prepared_foreign_transactions", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Sets the maximum number of simultaneously prepared transactions on foreign servers."), + NULL + }, + &max_prepared_foreign_xacts, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"foreign_transaction_resolver_timeout", PGC_SIGHUP, FDWXACT_RESOLVER, + gettext_noop("Sets the maximum time to wait for foreign transaction resolution."), + NULL, + GUC_UNIT_MS + }, + &foreign_xact_resolver_timeout, + 60 * 1000, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"max_foreign_transaction_resolvers", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Maximum number of foreign transaction resolution processes."), + NULL + }, + &max_foreign_xact_resolvers, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + + { + {"foreign_transaction_resolution_retry_interval", PGC_SIGHUP, FDWXACT_RESOLVER, + gettext_noop("Sets the time to wait before retrying to resolve foreign transaction " + "after a failed attempt."), + NULL, + GUC_UNIT_MS + }, + &foreign_xact_resolution_retry_interval, + 5000, 1, INT_MAX, + NULL, NULL, NULL + }, + #ifdef LOCK_DEBUG { {"trace_lock_oidmin", PGC_SUSET, DEVELOPER_OPTIONS, @@ -4413,6 +4485,16 @@ static struct config_enum ConfigureNamesEnum[] = NULL, assign_synchronous_commit, NULL }, + { + {"foreign_twophase_commit", PGC_USERSET, FDWXACT_SETTINGS, + gettext_noop("Use of foreign twophase commit for the current transaction."), + NULL + }, + &foreign_twophase_commit, + FOREIGN_TWOPHASE_COMMIT_DISABLED, foreign_twophase_commit_options, + check_foreign_twophase_commit, NULL, NULL + }, + { {"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING, gettext_noop("Allows archiving of WAL files using archive_command."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 9541879c1f..22e014aecd 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -125,6 +125,8 @@ #temp_buffers = 8MB # min 800kB #max_prepared_transactions = 0 # zero disables the feature # (change requires restart) +#max_prepared_foreign_transactions = 0 # zero disables the feature + # (change requires restart) # Caution: it is not advisable to set max_prepared_transactions nonzero unless # you actively intend to use prepared transactions. #work_mem = 4MB # min 64kB @@ -341,6 +343,20 @@ #max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers +#------------------------------------------------------------------------------ +# FOREIGN TRANSACTION +#------------------------------------------------------------------------------ + +#foreign_twophase_commit = off + +#max_foreign_transaction_resolvers = 0 # max number of resolver process + # (change requires restart) +#foreign_transaction_resolver_timeout = 60s # in milliseconds; 0 disables +#foreign_transaction_resolution_retry_interval = 5s # time to wait before + # retrying to resolve + # foreign transactions + # after a failed attempt + #------------------------------------------------------------------------------ # QUERY TUNING #------------------------------------------------------------------------------ diff --git a/src/backend/utils/probes.d b/src/backend/utils/probes.d index f08a49c9dd..dd8878025b 100644 --- a/src/backend/utils/probes.d +++ b/src/backend/utils/probes.d @@ -81,6 +81,8 @@ provider postgresql { probe multixact__checkpoint__done(bool); probe twophase__checkpoint__start(); probe twophase__checkpoint__done(); + probe fdwxact__checkpoint__start(); + probe fdwxact__checkpoint__done(); probe smgr__md__read__start(ForkNumber, BlockNumber, Oid, Oid, Oid, int); probe smgr__md__read__done(ForkNumber, BlockNumber, Oid, Oid, Oid, int, int, int); diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c index 1f6d8939be..49dc5a519f 100644 --- a/src/bin/initdb/initdb.c +++ b/src/bin/initdb/initdb.c @@ -210,6 +210,7 @@ static const char *const subdirs[] = { "pg_snapshots", "pg_subtrans", "pg_twophase", + "pg_fdwxact", "pg_multixact", "pg_multixact/members", "pg_multixact/offsets", diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c index 19e21ab491..9ae3bfe4dd 100644 --- a/src/bin/pg_controldata/pg_controldata.c +++ b/src/bin/pg_controldata/pg_controldata.c @@ -301,6 +301,8 @@ main(int argc, char *argv[]) ControlFile->max_wal_senders); printf(_("max_prepared_xacts setting: %d\n"), ControlFile->max_prepared_xacts); + printf(_("max_prepared_foreign_transactions setting: %d\n"), + ControlFile->max_prepared_foreign_xacts); printf(_("max_locks_per_xact setting: %d\n"), ControlFile->max_locks_per_xact); printf(_("track_commit_timestamp setting: %s\n"), diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 2e286f6339..c5ee22132e 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -710,6 +710,7 @@ GuessControlValues(void) ControlFile.max_wal_senders = 10; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_prepared_foreign_xacts = 0; ControlFile.max_locks_per_xact = 64; ControlFile.maxAlign = MAXIMUM_ALIGNOF; @@ -914,6 +915,7 @@ RewriteControlFile(void) ControlFile.max_wal_senders = 10; ControlFile.max_worker_processes = 8; ControlFile.max_prepared_xacts = 0; + ControlFile.max_prepared_foreign_xacts = 0; ControlFile.max_locks_per_xact = 64; /* The control file gets flushed here. */ diff --git a/src/bin/pg_waldump/fdwxactdesc.c b/src/bin/pg_waldump/fdwxactdesc.c new file mode 120000 index 0000000000..ce8c21880c --- /dev/null +++ b/src/bin/pg_waldump/fdwxactdesc.c @@ -0,0 +1 @@ +../../../src/backend/access/rmgrdesc/fdwxactdesc.c \ No newline at end of file diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index 852d8ca4b1..b616cea347 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -11,6 +11,7 @@ #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" +#include "access/fdwxact_xlog.h" #include "access/generic_xlog.h" #include "access/ginxlog.h" #include "access/gistxlog.h" diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h new file mode 100644 index 0000000000..147d41c708 --- /dev/null +++ b/src/include/access/fdwxact.h @@ -0,0 +1,165 @@ +/* + * fdwxact.h + * + * PostgreSQL global transaction manager + * + * Portions Copyright (c) 2018, PostgreSQL Global Development Group + * + * src/include/access/fdwxact.h + */ +#ifndef FDWXACT_H +#define FDWXACT_H + +#include "access/fdwxact_xlog.h" +#include "access/xlogreader.h" +#include "foreign/foreign.h" +#include "lib/stringinfo.h" +#include "miscadmin.h" +#include "nodes/pg_list.h" +#include "nodes/execnodes.h" +#include "storage/backendid.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "utils/guc.h" +#include "utils/timeout.h" +#include "utils/timestamp.h" + +/* fdwXactState */ +#define FDWXACT_NOT_WAITING 0 +#define FDWXACT_WAITING 1 +#define FDWXACT_WAIT_COMPLETE 2 + +/* Flag passed to FDW transaction management APIs */ +#define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback + without preparation */ + +/* Enum for foreign_twophase_commit parameter */ +typedef enum +{ + FOREIGN_TWOPHASE_COMMIT_DISABLED, /* disable foreign twophase commit */ + FOREIGN_TWOPHASE_COMMIT_PREFER, /* use twophase commit where available */ + FOREIGN_TWOPHASE_COMMIT_REQUIRED /* all foreign servers have to support + twophase commit */ +} ForeignTwophaseCommitLevel; + +/* Enum to track the status of foreign transaction */ +typedef enum +{ + FDWXACT_STATUS_INVALID, + FDWXACT_STATUS_INITIAL, + FDWXACT_STATUS_PREPARING, /* foreign transaction is being prepared */ + FDWXACT_STATUS_PREPARED, /* foreign transaction is prepared */ + FDWXACT_STATUS_COMMITTING, /* foreign prepared transaction is to + * be committed */ + FDWXACT_STATUS_ABORTING, /* foreign prepared transaction is to be + * aborted */ + FDWXACT_STATUS_RESOLVED +} FdwXactStatus; + +typedef struct FdwXactData *FdwXact; + +/* + * Shared memory state of a single foreign transaction. + */ +typedef struct FdwXactData +{ + FdwXact fdwxact_free_next; /* Next free FdwXact entry */ + + Oid dbid; /* database oid where to find foreign server + * and user mapping */ + TransactionId local_xid; /* XID of local transaction */ + Oid serverid; /* foreign server where transaction takes + * place */ + Oid userid; /* user who initiated the foreign + * transaction */ + Oid umid; + bool indoubt; /* Is an in-doubt transaction? */ + slock_t mutex; /* Protect the above fields */ + + /* The status of the foreign transaction, protected by FdwXactLock */ + FdwXactStatus status; + /* + * Note that we need to keep track of two LSNs for each FdwXact. We keep + * track of the start LSN because this is the address we must use to read + * state data back from WAL when committing a FdwXact. We keep track of + * the end LSN because that is the LSN we need to wait for prior to + * commit. + */ + XLogRecPtr insert_start_lsn; /* XLOG offset of inserting this entry start */ + XLogRecPtr insert_end_lsn; /* XLOG offset of inserting this entry end */ + + bool valid; /* has the entry been complete and written to file? */ + BackendId held_by; /* backend who are holding */ + bool ondisk; /* true if prepare state file is on disk */ + bool inredo; /* true if entry was added via xlog_redo */ + + char fdwxact_id[FDWXACT_ID_MAX_LEN]; /* prepared transaction identifier */ +} FdwXactData; + +/* + * Shared memory layout for maintaining foreign prepared transaction entries. + * Adding or removing FdwXact entry needs to hold FdwXactLock in exclusive mode, + * and iterating fdwXacts needs that in shared mode. + */ +typedef struct +{ + /* Head of linked list of free FdwXactData structs */ + FdwXact free_fdwxacts; + + /* Number of valid foreign transaction entries */ + int num_fdwxacts; + + /* Upto max_prepared_foreign_xacts entries in the array */ + FdwXact fdwxacts[FLEXIBLE_ARRAY_MEMBER]; /* Variable length array */ +} FdwXactCtlData; + +/* Pointer to the shared memory holding the foreign transactions data */ +FdwXactCtlData *FdwXactCtl; + +/* State data for foreign transaction resolution, passed to FDW callbacks */ +typedef struct FdwXactRslvState +{ + /* Foreign transaction information */ + char *fdwxact_id; + + ForeignServer *server; + UserMapping *usermapping; + + int flags; /* OR of FDWXACT_FLAG_xx flags */ +} FdwXactRslvState; + +/* GUC parameters */ +extern int max_prepared_foreign_xacts; +extern int max_foreign_xact_resolvers; +extern int foreign_xact_resolution_retry_interval; +extern int foreign_xact_resolver_timeout; +extern int foreign_twophase_commit; + +/* Function declarations */ +extern Size FdwXactShmemSize(void); +extern void FdwXactShmemInit(void); +extern void restoreFdwXactData(void); +extern TransactionId PrescanFdwXacts(TransactionId oldestActiveXid); +extern void RecoverFdwXacts(void); +extern void AtEOXact_FdwXacts(bool is_commit); +extern void AtPrepare_FdwXacts(void); +extern bool fdwxact_exists(Oid dboid, Oid serverid, Oid userid); +extern void CheckPointFdwXacts(XLogRecPtr redo_horizon); +extern bool FdwTwoPhaseNeeded(void); +extern void PreCommit_FdwXacts(void); +extern void KnownFdwXactRecreateFiles(XLogRecPtr redo_horizon); +extern void FdwXactWaitToBeResolved(TransactionId wait_xid, bool commit); +extern bool FdwXactIsForeignTwophaseCommitRequired(void); +extern void FdwXactResolveTransactionAndReleaseWaiter(Oid dbid, TransactionId xid, + PGPROC *waiter); +extern bool FdwXactResolveInDoubtTransactions(Oid dbid); +extern PGPROC *FdwXactGetWaiter(TimestampTz *nextResolutionTs_p, TransactionId *waitXid_p); +extern void FdwXactCleanupAtProcExit(void); +extern void RegisterFdwXactByRelId(Oid relid, bool modified); +extern void RegisterFdwXactByServerId(Oid serverid, bool modified); +extern void FdwXactMarkForeignServerAccessed(Oid relid, bool modified); +extern bool check_foreign_twophase_commit(int *newval, void **extra, + GucSource source); +extern bool FdwXactWaiterExists(Oid dbid); + +#endif /* FDWXACT_H */ diff --git a/src/include/access/fdwxact_launcher.h b/src/include/access/fdwxact_launcher.h new file mode 100644 index 0000000000..dd0f5d16ff --- /dev/null +++ b/src/include/access/fdwxact_launcher.h @@ -0,0 +1,29 @@ +/*------------------------------------------------------------------------- + * + * fdwxact_launcher.h + * PostgreSQL foreign transaction launcher definitions + * + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * src/include/access/fdwxact_launcher.h + * + *------------------------------------------------------------------------- + */ + +#ifndef FDWXACT_LAUNCHER_H +#define FDWXACT_LAUNCHER_H + +#include "access/fdwxact.h" + +extern void FdwXactLauncherRegister(void); +extern void FdwXactLauncherMain(Datum main_arg); +extern void FdwXactLauncherRequestToLaunch(void); +extern void FdwXactLauncherRequestToLaunchForRetry(void); +extern void FdwXactLaunchOrWakeupResolver(void); +extern Size FdwXactRslvShmemSize(void); +extern void FdwXactRslvShmemInit(void); +extern bool IsFdwXactLauncher(void); + + +#endif /* FDWXACT_LAUNCHER_H */ diff --git a/src/include/access/fdwxact_resolver.h b/src/include/access/fdwxact_resolver.h new file mode 100644 index 0000000000..2607654024 --- /dev/null +++ b/src/include/access/fdwxact_resolver.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * fdwxact_resolver.h + * PostgreSQL foreign transaction resolver definitions + * + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * src/include/access/fdwxact_resolver.h + * + *------------------------------------------------------------------------- + */ +#ifndef FDWXACT_RESOLVER_H +#define FDWXACT_RESOLVER_H + +#include "access/fdwxact.h" + +extern void FdwXactResolverMain(Datum main_arg); +extern bool IsFdwXactResolver(void); + +extern int foreign_xact_resolver_timeout; + +#endif /* FDWXACT_RESOLVER_H */ diff --git a/src/include/access/fdwxact_xlog.h b/src/include/access/fdwxact_xlog.h new file mode 100644 index 0000000000..39ca66beef --- /dev/null +++ b/src/include/access/fdwxact_xlog.h @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * fdwxact_xlog.h + * Foreign transaction XLOG definitions. + * + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * src/include/access/fdwxact_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef FDWXACT_XLOG_H +#define FDWXACT_XLOG_H + +#include "access/xlogreader.h" +#include "lib/stringinfo.h" + +/* Info types for logs related to FDW transactions */ +#define XLOG_FDWXACT_INSERT 0x00 +#define XLOG_FDWXACT_REMOVE 0x10 + +/* Maximum length of the prepared transaction id, borrowed from twophase.c */ +#define FDWXACT_ID_MAX_LEN 200 + +/* + * On disk file structure, also used to WAL + */ +typedef struct +{ + TransactionId local_xid; + Oid dbid; /* database oid where to find foreign server + * and user mapping */ + Oid serverid; /* foreign server where transaction takes + * place */ + Oid userid; /* user who initiated the foreign transaction */ + Oid umid; + char fdwxact_id[FDWXACT_ID_MAX_LEN]; /* foreign txn prepare id */ +} FdwXactOnDiskData; + +typedef struct xl_fdwxact_remove +{ + TransactionId xid; + Oid serverid; + Oid userid; + Oid dbid; + bool force; +} xl_fdwxact_remove; + +extern void fdwxact_redo(XLogReaderState *record); +extern void fdwxact_desc(StringInfo buf, XLogReaderState *record); +extern const char *fdwxact_identify(uint8 info); + +#endif /* FDWXACT_XLOG_H */ diff --git a/src/include/access/resolver_internal.h b/src/include/access/resolver_internal.h new file mode 100644 index 0000000000..55fc970b69 --- /dev/null +++ b/src/include/access/resolver_internal.h @@ -0,0 +1,66 @@ +/*------------------------------------------------------------------------- + * + * resolver_internal.h + * Internal headers shared by fdwxact resolvers. + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * src/include/access/resovler_internal.h + * + *------------------------------------------------------------------------- + */ + +#ifndef RESOLVER_INTERNAL_H +#define RESOLVER_INTERNAL_H + +#include "storage/latch.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "utils/timestamp.h" + +/* + * Each foreign transaction resolver has a FdwXactResolver struct in + * shared memory. This struct is protected by FdwXactResolverLaunchLock. + */ +typedef struct FdwXactResolver +{ + pid_t pid; /* this resolver's PID, or 0 if not active */ + Oid dbid; /* database oid */ + + /* Indicates if this slot is used of free */ + bool in_use; + + /* Stats */ + TimestampTz last_resolved_time; + + /* Protect shared variables shown above */ + slock_t mutex; + + /* + * Pointer to the resolver's patch. Used by backends to wake up this + * resolver when it has work to do. NULL if the resolver isn't active. + */ + Latch *latch; +} FdwXactResolver; + +/* There is one FdwXactRslvCtlData struct for the whole database cluster */ +typedef struct FdwXactRslvCtlData +{ + /* Foreign transaction resolution queue. Protected by FdwXactLock */ + SHM_QUEUE fdwxact_queue; + + /* Supervisor process and latch */ + pid_t launcher_pid; + Latch *launcher_latch; + + FdwXactResolver resolvers[FLEXIBLE_ARRAY_MEMBER]; +} FdwXactRslvCtlData; +#define SizeOfFdwXactRslvCtlData \ + (offsetof(FdwXactRslvCtlData, resolvers) + sizeof(FdwXactResolver)) + +extern FdwXactRslvCtlData *FdwXactRslvCtl; + +extern FdwXactResolver *MyFdwXactResolver; +extern FdwXactRslvCtlData *FdwXactRslvCtl; + +#endif /* RESOLVER_INTERNAL_H */ diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 3c0db2ccf5..5798b4cd99 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL) +PG_RMGR(RM_FDWXACT_ID, "Foreign Transactions", fdwxact_redo, fdwxact_desc, fdwxact_identify, NULL, NULL, NULL) diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 02b5315c43..e8c094d708 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -36,6 +36,7 @@ extern void PostPrepare_Twophase(void); extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held); extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held); +extern bool TwoPhaseExists(TransactionId xid); extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid, TimestampTz prepared_at, diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cb5c4935d2..a75e6998f0 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -108,6 +108,13 @@ extern int MyXactFlags; */ #define XACT_FLAGS_WROTENONTEMPREL (1U << 2) +/* + * XACT_FLAGS_FDWNONPREPARE - set when we wrote data on foreign table of which + * server isn't capable of two-phase commit + * relation. + */ +#define XACT_FLAGS_FDWNOPREPARE (1U << 3) + /* * start- and end-of-transaction callbacks for dynamically loaded modules */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index e295dc65fb..d1ce20242f 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -232,6 +232,7 @@ typedef struct xl_parameter_change int max_worker_processes; int max_wal_senders; int max_prepared_xacts; + int max_prepared_foreign_xacts; int max_locks_per_xact; int wal_level; bool wal_log_hints; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index cf7d4485e9..f2174a0208 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -179,6 +179,7 @@ typedef struct ControlFileData int max_worker_processes; int max_wal_senders; int max_prepared_xacts; + int max_prepared_foreign_xacts; int max_locks_per_xact; bool track_commit_timestamp; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index ac8f64b219..1072c38aa6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5184,6 +5184,13 @@ proargmodes => '{i,o,o,o,o,o,o,o,o}', proargnames => '{subid,subid,relid,pid,received_lsn,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time}', prosrc => 'pg_stat_get_subscription' }, +{ oid => '9705', descr => 'statistics: information about foreign transaction resolver', + proname => 'pg_stat_get_foreign_xact', proisstrict => 'f', provolatile => 's', + proparallel => 'r', prorettype => 'record', proargtypes => '', + proallargtypes => '{oid,oid,timestamptz}', + proargmodes => '{o,o,o}', + proargnames => '{pid,dbid,last_resolved_time}', + prosrc => 'pg_stat_get_foreign_xact' }, { oid => '2026', descr => 'statistics: current backend PID', proname => 'pg_backend_pid', provolatile => 's', proparallel => 'r', prorettype => 'int4', proargtypes => '', prosrc => 'pg_backend_pid' }, @@ -5897,6 +5904,24 @@ proargnames => '{type,object_names,object_args,classid,objid,objsubid}', prosrc => 'pg_get_object_address' }, +{ oid => '9706', descr => 'view foreign transactions', + proname => 'pg_foreign_xacts', prorows => '1000', proretset => 't', + provolatile => 'v', prorettype => 'record', proargtypes => '', + proallargtypes => '{oid,xid,oid,oid,text,bool,text}', + proargmodes => '{o,o,o,o,o,o,o}', + proargnames => '{dbid,xid,serverid,userid,status,in_doubt,identifier}', + prosrc => 'pg_foreign_xacts' }, +{ oid => '9707', descr => 'remove foreign transaction without resolution', + proname => 'pg_remove_foreign_xact', provolatile => 'v', prorettype => 'bool', + proargtypes => 'xid oid oid', + proargnames => '{xid,serverid,userid}', + prosrc => 'pg_remove_foreign_xact' }, +{ oid => '9708', descr => 'resolve one foreign transaction', + proname => 'pg_resolve_foreign_xact', provolatile => 'v', prorettype => 'bool', + proargtypes => 'xid oid oid', + proargnames => '{xid,serverid,userid}', + prosrc => 'pg_resolve_foreign_xact' }, + { oid => '2079', descr => 'is table visible in search path?', proname => 'pg_table_is_visible', procost => '10', provolatile => 's', prorettype => 'bool', proargtypes => 'oid', prosrc => 'pg_table_is_visible' }, @@ -6015,6 +6040,10 @@ { oid => '2851', descr => 'wal filename, given a wal location', proname => 'pg_walfile_name', prorettype => 'text', proargtypes => 'pg_lsn', prosrc => 'pg_walfile_name' }, +{ oid => '9709', + descr => 'stop a foreign transaction resolver process running on the given database', + proname => 'pg_stop_foreing_xact_resolver', provolatile => 'v', prorettype => 'bool', + proargtypes => 'oid', prosrc => 'pg_stop_foreign_xact_resolver'}, { oid => '3165', descr => 'difference in bytes, given two wal locations', proname => 'pg_wal_lsn_diff', prorettype => 'numeric', diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 822686033e..c7b33d72ec 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -12,6 +12,7 @@ #ifndef FDWAPI_H #define FDWAPI_H +#include "access/fdwxact.h" #include "access/parallel.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" @@ -169,6 +170,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root, typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef void (*PrepareForeignTransaction_function) (FdwXactRslvState *frstate); +typedef void (*CommitForeignTransaction_function) (FdwXactRslvState *frstate); +typedef void (*RollbackForeignTransaction_function) (FdwXactRslvState *frstate); +typedef char *(*GetPrepareId_function) (TransactionId xid, Oid serverid, + Oid userid, int *prep_id_len); /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler @@ -236,6 +242,12 @@ typedef struct FdwRoutine /* Support functions for IMPORT FOREIGN SCHEMA */ ImportForeignSchema_function ImportForeignSchema; + /* Support functions for transaction management */ + PrepareForeignTransaction_function PrepareForeignTransaction; + CommitForeignTransaction_function CommitForeignTransaction; + RollbackForeignTransaction_function RollbackForeignTransaction; + GetPrepareId_function GetPrepareId; + /* Support functions for parallelism under Gather node */ IsForeignScanParallelSafe_function IsForeignScanParallelSafe; EstimateDSMForeignScan_function EstimateDSMForeignScan; diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h index 4de157c19c..91c2276915 100644 --- a/src/include/foreign/foreign.h +++ b/src/include/foreign/foreign.h @@ -69,6 +69,7 @@ extern ForeignServer *GetForeignServerExtended(Oid serverid, bits16 flags); extern ForeignServer *GetForeignServerByName(const char *name, bool missing_ok); extern UserMapping *GetUserMapping(Oid userid, Oid serverid); +extern UserMapping *GetUserMappingByOid(Oid umid); extern ForeignDataWrapper *GetForeignDataWrapper(Oid fdwid); extern ForeignDataWrapper *GetForeignDataWrapperExtended(Oid fdwid, bits16 flags); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index fe076d823d..d82d8f7abc 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -776,6 +776,8 @@ typedef enum WAIT_EVENT_BGWRITER_HIBERNATE, WAIT_EVENT_BGWRITER_MAIN, WAIT_EVENT_CHECKPOINTER_MAIN, + WAIT_EVENT_FDWXACT_RESOLVER_MAIN, + WAIT_EVENT_FDWXACT_LAUNCHER_MAIN, WAIT_EVENT_LOGICAL_APPLY_MAIN, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN, WAIT_EVENT_PGSTAT_MAIN, @@ -853,7 +855,9 @@ typedef enum WAIT_EVENT_REPLICATION_ORIGIN_DROP, WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, - WAIT_EVENT_SYNC_REP + WAIT_EVENT_SYNC_REP, + WAIT_EVENT_FDWXACT, + WAIT_EVENT_FDWXACT_RESOLUTION } WaitEventIPC; /* ---------- @@ -933,6 +937,9 @@ typedef enum WAIT_EVENT_TWOPHASE_FILE_READ, WAIT_EVENT_TWOPHASE_FILE_SYNC, WAIT_EVENT_TWOPHASE_FILE_WRITE, + WAIT_EVENT_FDWXACT_FILE_READ, + WAIT_EVENT_FDWXACT_FILE_WRITE, + WAIT_EVENT_FDWXACT_FILE_SYNC, WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ, WAIT_EVENT_WAL_BOOTSTRAP_SYNC, WAIT_EVENT_WAL_BOOTSTRAP_WRITE, diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 281e1db725..c802201193 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -16,6 +16,7 @@ #include "access/clog.h" #include "access/xlogdefs.h" +#include "datatype/timestamp.h" #include "lib/ilist.h" #include "storage/latch.h" #include "storage/lock.h" @@ -152,6 +153,16 @@ struct PGPROC int syncRepState; /* wait state for sync rep */ SHM_QUEUE syncRepLinks; /* list link if process is in syncrep queue */ + /* + * Info to allow us to wait for foreign transaction to be resolved, if + * needed. + */ + TransactionId fdwXactWaitXid; /* waiting for foreign transaction involved with + * this transaction id to be resolved */ + int fdwXactState; /* wait state for foreign transaction resolution */ + SHM_QUEUE fdwXactLinks; /* list link if process is in queue */ + TimestampTz fdwXactNextResolutionTs; + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 8f67b860e7..deb293c1a9 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -36,6 +36,8 @@ #define PROCARRAY_SLOTS_XMIN 0x20 /* replication slot xmin, * catalog_xmin */ +#define PROCARRAY_FDWXACT_XMIN 0x40 /* unresolved distributed + transaciton xmin */ /* * Only flags in PROCARRAY_PROC_FLAGS_MASK are considered when matching * PGXACT->vacuumFlags. Other flags are used for different purposes and @@ -125,4 +127,7 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); + +extern void ProcArraySetFdwXactUnresolvedXmin(TransactionId xmin); +extern TransactionId ProcArrayGetFdwXactUnresolvedXmin(void); #endif /* PROCARRAY_H */ diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index d68976fafa..d5fec50969 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -96,6 +96,9 @@ enum config_group CLIENT_CONN_PRELOAD, CLIENT_CONN_OTHER, LOCK_MANAGEMENT, + FDWXACT, + FDWXACT_SETTINGS, + FDWXACT_RESOLVER, COMPAT_OPTIONS, COMPAT_OPTIONS_PREVIOUS, COMPAT_OPTIONS_CLIENT, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index c9cc569404..ed229d5a67 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1341,6 +1341,14 @@ pg_file_settings| SELECT a.sourcefile, a.applied, a.error FROM pg_show_all_file_settings() a(sourcefile, sourceline, seqno, name, setting, applied, error); +pg_foreign_xacts| SELECT f.dbid, + f.xid, + f.serverid, + f.userid, + f.status, + f.in_doubt, + f.identifier + FROM pg_foreign_xacts() f(dbid, xid, serverid, userid, status, in_doubt, identifier); pg_group| SELECT pg_authid.rolname AS groname, pg_authid.oid AS grosysid, ARRAY( SELECT pg_auth_members.member @@ -1841,6 +1849,11 @@ pg_stat_database_conflicts| SELECT d.oid AS datid, pg_stat_get_db_conflict_bufferpin(d.oid) AS confl_bufferpin, pg_stat_get_db_conflict_startup_deadlock(d.oid) AS confl_deadlock FROM pg_database d; +pg_stat_foreign_xact| SELECT r.pid, + r.dbid, + r.last_resolved_time + FROM pg_stat_get_foreign_xact() r(pid, dbid, last_resolved_time) + WHERE (r.pid IS NOT NULL); pg_stat_gssapi| SELECT s.pid, s.gss_auth AS gss_authenticated, s.gss_princ AS principal, -- 2.23.0 From 3363abd531595233fb59e0ab6078a011ab8060e9 Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Date: Thu, 5 Dec 2019 17:01:08 +0900 Subject: [PATCH v26 3/5] Documentation update. Original Author: Masahiko Sawada <sawada.mshk@gmail.com> --- doc/src/sgml/catalogs.sgml | 145 +++++++++++++ doc/src/sgml/config.sgml | 146 ++++++++++++- doc/src/sgml/distributed-transaction.sgml | 158 +++++++++++++++ doc/src/sgml/fdwhandler.sgml | 236 ++++++++++++++++++++++ doc/src/sgml/filelist.sgml | 1 + doc/src/sgml/func.sgml | 89 ++++++++ doc/src/sgml/monitoring.sgml | 60 ++++++ doc/src/sgml/postgres.sgml | 1 + doc/src/sgml/storage.sgml | 6 + 9 files changed, 841 insertions(+), 1 deletion(-) create mode 100644 doc/src/sgml/distributed-transaction.sgml diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 55694c4368..1b720da03d 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -8267,6 +8267,11 @@ SCRAM-SHA-256$<replaceable><iteration count></replaceable>:<replaceable>&l <entry>open cursors</entry> </row> + <row> + <entry><link linkend="view-pg-foreign-xacts"><structname>pg_foreign_xacts</structname></link></entry> + <entry>foreign transactions</entry> + </row> + <row> <entry><link linkend="view-pg-file-settings"><structname>pg_file_settings</structname></link></entry> <entry>summary of configuration file contents</entry> @@ -9712,6 +9717,146 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx </sect1> + <sect1 id="view-pg-foreign-xacts"> + <title><structname>pg_foreign_xacts</structname></title> + + <indexterm zone="view-pg-foreign-xacts"> + <primary>pg_foreign_xacts</primary> + </indexterm> + + <para> + The view <structname>pg_foreign_xacts</structname> displays + information about foreign transactions that are opened on + foreign servers for atomic distributed transaction commit (see + <xref linkend="atomic-commit"/> for details). + </para> + + <para> + <structname>pg_foreign_xacts</structname> contains one row per foreign + transaction. An entry is removed when the foreign transaction is + committed or rolled back. + </para> + + <table> + <title><structname>pg_foreign_xacts</structname> Columns</title> + + <tgroup cols="4"> + <thead> + <row> + <entry>Name</entry> + <entry>Type</entry> + <entry>References</entry> + <entry>Description</entry> + </row> + </thead> + <tbody> + <row> + <entry><structfield>dbid</structfield></entry> + <entry><type>oid</type></entry> + <entry><literal><link linkend="catalog-pg-database"><structname>pg_database</structname></link>.oid</literal></entry> + <entry> + OID of the database which the foreign transaction resides in + </entry> + </row> + <row> + <entry><structfield>xid</structfield></entry> + <entry><type>xid</type></entry> + <entry></entry> + <entry> + Numeric transaction identifier with that this foreign transaction + associates + </entry> + </row> + <row> + <entry><structfield>serverid</structfield></entry> + <entry><type>oid</type></entry> + <entry><literal><link linkend="catalog-pg-foreign-server"><structname>pg_foreign_server</structname></link>.oid</literal></entry> + <entry> + The OID of the foreign server on that the foreign transaction is prepared + </entry> + </row> + <row> + <entry><structfield>userid</structfield></entry> + <entry><type>oid</type></entry> + <entry><literal><link linkend="view-pg-user"><structname>pg_user</structname></link>.oid</literal></entry> + <entry> + The OID of the user that prepared this foreign transaction. + </entry> + </row> + <row> + <entry><structfield>status</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + <entry> + Status of foreign transaction. Possible values are: + <itemizedlist> + <listitem> + <para> + <literal>initial</literal> : Initial status. + </para> + </listitem> + <listitem> + <para> + <literal>preparing</literal> : This foreign transaction is being prepared. + </para> + </listitem> + <listitem> + <para> + <literal>prepared</literal> : This foreign transaction has been prepared. + </para> + </listitem> + <listitem> + <para> + <literal>committing</literal> : This foreign transcation is being committed. + </para> + </listitem> + <listitem> + <para> + <literal>aborting</literal> : This foreign transaction is being aborted. + </para> + </listitem> + <listitem> + <para> + <literal>resolved</literal> : This foreign transaction has been resolved. + </para> + </listitem> + </itemizedlist> + </entry> + </row> + <row> + <entry><structfield>in_doubt</structfield></entry> + <entry><type>boolean</type></entry> + <entry></entry> + <entry> + If <literal>true</literal> this foreign transaction is in-dbout status and + needs to be resolved by calling <function>pg_resolve_fdwxact</function> + function. + </entry> + </row> + <row> + <entry><structfield>identifier</structfield></entry> + <entry><type>text</type></entry> + <entry></entry> + <entry> + The identifier of the prepared foreign transaction. + </entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + When the <structname>pg_prepared_xacts</structname> view is accessed, the + internal transaction manager data structures are momentarily locked, and + a copy is made for the view to display. This ensures that the + view produces a consistent set of results, while not blocking + normal operations longer than necessary. Nonetheless + there could be some impact on database performance if this view is + frequently accessed. + </para> + + </sect1> + <sect1 id="view-pg-publication-tables"> <title><structname>pg_publication_tables</structname></title> diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 53ac14490a..69778750f3 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4378,7 +4378,6 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class=" </variablelist> </sect2> - </sect1> <sect1 id="runtime-config-query"> @@ -8818,6 +8817,151 @@ dynamic_library_path = 'C:\tools\postgresql;H:\my_project\lib;$libdir' </variablelist> </sect1> + <sect1 id="runtime-config-distributed-transaction"> + <title>Distributed Transaction Management</title> + + <sect2 id="runtime-config-distributed-transaction-settings"> + <title>Setting</title> + <variablelist> + + <varlistentry id="guc-foreign-twophase-commit" xreflabel="foreign_twophse_commit"> + <term><varname>foreign_twophase_commit</varname> (<type>enum</type>) + <indexterm> + <primary><varname>foreign_twophase_commit</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies whether transaction commit will wait for all involving foreign + transaction to be resolved before the command returns a "success" + indication to the client. Valid values are <literal>required</literal>, + <literal>prefer</literal> and <literal>disabled</literal>. The default + setting is <literal>disabled</literal>. Setting to + <literal>disabled</literal> don't use two-phase commit protocol to + commit or rollback distributed transactions. When set to + <literal>required</literal> the distributed transaction strictly + requires that all written servers can use two-phase commit protocol. + That is, the distributed transaction cannot commit if even one server + does not support the transaction management callback routines + (described in <xref linkend="fdw-callbacks-transaction-managements"/>). + When set to <literal>prefer</literal> the distributed transaction use + two-phase commit protocol on only servers where available and commit on + others. Note that when <literal>disabled</literal> or + <literal>prefer</literal> there can be risk of database consistency + among all servers that involved in the distributed transaction when some + foreign server crashes during committing the distributed transaction. + </para> + + <para> + Both <varname>max_prepared_foreign_transactions</varname> and + <varname>max_foreign_transaction_resolvers</varname> must be non-zero + value to set this parameter either <literal>required</literal> or + <literal>prefer</literal>. + </para> + + <para> + This parameter can be changed at any time; the behavior for any one + transaction is determined by the setting in effect when it commits. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-max-prepared-foreign-transactions" xreflabel="max_prepared_foreign_transactions"> + <term><varname>max_prepared_foreign_transactions</varname> (<type>integer</type>) + <indexterm> + <primary><varname>max_prepared_foreign_transactions</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Sets the maximum number of foreign transactions that can be prepared + simultaneously. A single local transaction can give rise to multiple + foreign transaction. If <literal>N</literal> local transactions each + across <literal>K</literal> foreign server this value need to be set + <literal>N * K</literal>, not just <literal>N</literal>. + This parameter can only be set at server start. + </para> + <para> + When running a standby server, you must set this parameter to the + same or higher value than on the master server. Otherwise, queries + will not be allowed in the standby server. + </para> + </listitem> + </varlistentry> + + </variablelist> + </sect2> + + <sect2 id="runtime-config-foreign-transaction-resolver"> + <title>Foreign Transaction Resolvers</title> + + <para> + These settings control the behavior of a foreign transaction resolver. + </para> + + <variablelist> + <varlistentry id="guc-max-foreign-transaction-resolvers" xreflabel="max_foreign_transaction_resolvers"> + <term><varname>max_foreign_transaction_resolvers</varname> (<type>int</type>) + <indexterm> + <primary><varname>max_foreign_transaction_resolvers</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies maximum number of foreign transaction resolution workers. A foreign transaction + resolver is responsible for foreign transaction resolution on one database. + </para> + <para> + Foreign transaction resolution workers are taken from the pool defined by + <varname>max_worker_processes</varname>. + </para> + <para> + The default value is 0. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-foreign-transaction-resolution-rety-interval" xreflabel="foreign_transaction_resolution_retry_interval"> + <term><varname>foreign_transaction_resolution_retry_interval</varname> (<type>integer</type>) + <indexterm> + <primary><varname>foreign_transaction_resolution_interval</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specify how long the foreign transaction resolver should wait when the last resolution + fails before retrying to resolve foreign transaction. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command line. + </para> + <para> + The default value is 10 seconds. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-foreign-transaction-resolver-timeout" xreflabel="foreign_transaction_resolver_timeout"> + <term><varname>foreign_transaction_resolver_timeout</varname> (<type>integer</type>) + <indexterm> + <primary><varname>foreign_transaction_resolver_timeout</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Terminate foreign transaction resolver processes that don't have any foreign + transactions to resolve longer than the specified number of milliseconds. + A value of zero disables the timeout mechanism, meaning it connects to one + database until stopping manually. This parameter can only be set in the + <filename>postgresql.conf</filename> file or on the server command line. + </para> + <para> + The default value is 60 seconds. + </para> + </listitem> + </varlistentry> + </variablelist> + </sect2> + </sect1> + <sect1 id="runtime-config-compatible"> <title>Version and Platform Compatibility</title> diff --git a/doc/src/sgml/distributed-transaction.sgml b/doc/src/sgml/distributed-transaction.sgml new file mode 100644 index 0000000000..350b1afe68 --- /dev/null +++ b/doc/src/sgml/distributed-transaction.sgml @@ -0,0 +1,158 @@ +<!-- doc/src/sgml/distributed-transaction.sgml --> + +<chapter id="distributed-transaction"> + <title>Distributed Transaction</title> + + <para> + A distributed transaction is a transaction in which two or more network hosts + are involved. <productname>PostgreSQL</productname>'s global Transaction + manager supports distributed transactions that access foreign servers using + Foreign Data Wrappers. The global transaction manager is responsible for + managing transactions on foreign servers. + </para> + + <sect1 id="atomic-commit"> + <title>Atomic Commit</title> + + <para> + Atomic commit of distributed transaction is an operation that applies a set + of changes as a single operation globally. This guarantees all-or-nothing + results for the changes on all remote hosts involved in. + <productname>PostgreSQL</productname> provides a way to perform read-write + transactions with foreign resources using foreign data wrappers. + Using the <productname>PostgreSQL</productname>'s atomic commit ensures that + all changes on foreign servers end in either commit or rollback using the + transaction callback routines + (see <xref linkend="fdw-callbacks-transaction-managements"/>). + </para> + + <sect2> + <title>Atomic Commit Using Two-phase Commit Protocol</title> + + <para> + To achieve commit among all foreign servers automatially, + <productname>PostgreSQL</productname> employs two-phase commit protocol, + which is a type of atomic commitment protocol (ACP). + A <productname>PostgreSQL</productname> server that received SQL is called + <firstterm>coordinator node</firstterm> who is responsible for coordinating + all the partipanting transactions. Using two-phase commit protocol, the commit + sequence of distributed transaction performs with the following steps. + <orderedlist> + <listitem> + <para> + Prepare all transactions on foreign servers. + </para> + </listitem> + <listitem> + <para> + Commit locally. + </para> + </listitem> + <listitem> + <para> + Resolve all prepared transaction on foreign servers. + </para> + </listitem> + </orderedlist> + + </para> + + <para> + At the first step, <productname>PostgreSQL</productname> distributed + transaction manager prepares all transaction on the foreign servers if + two-phase commit is required. Two-phase commit is required when the + transaction modifies data on two or more servers including the local server + itself and <xref linkend="guc-foreign-twophase-commit"/>is + <literal>required</literal> or <literal>prefer</literal>. If all preparations + on foreign servers got successful go to the next step. Any failure happens + in this step <productname>PostgreSQL</productname> changes to rollback, then + rollback all transactions on both local and foreign servers. + </para> + + <para> + At the local commit step, <productname>PostgreSQL</productname> commit the + transaction locally. Any failure happens in this step + <productname>PostgreSQL</productname> changes rollback, then rollback all + transactions on both local and foreign servers. + </para> + + <para> + At the final step, prepared transactions are resolved by a foreign transaction + resolver process. + </para> + </sect2> + + <sect2 id="atomic-commit-transaction-resolution"> + <title>Foreign Transaction Resolver Processes</title> + + <para> + Foreign transaction resolver processes are auxiliary processes that is + responsible for foreign transaction resolution. They commit or rollback all + prepared transaction on foreign servers if the coordinator received agreement + messages from all foreign servers during the first step. + </para> + + <para> + One foreign transaction resolver is responsible for transaction resolutions + on one database of the coordinator side. On failure during resolution, they + retries to resolve at an interval of + <varname>foreign_transaction_resolution_interval</varname> time. + </para> + + <note> + <para> + During a foreign transaction resolver process connecting to the database, + database cannot be dropped. So to drop the database, you can call + <function>pg_stop_foreign_xact_resovler</function> function before dropping + the database. + </para> + </note> + </sect2> + + <sect2 id="atomic-commit-in-doubt-transaction"> + <title>Manual Resolution of In-Doubt Transactions</title> + + <para> + The atomic commit mechanism ensures that all foreign servers either commit + or rollback using two-phase commit protocol. However, distributed transactions + become <firstterm>in-doubt</firstterm> in three cases: where the foreign + server crashed or lost the connectibility to it during preparing foreign + transaction, where the coordinator node crashed during either preparing or + resolving distributed transaction and where user canceled the query. You can + check in-doubt transaction in <xref linkend="pg-stat-foreign-xact-view"/> + view. These foreign transactions need to be resolved by using + <function>pg_resolve_foriegn_xact</function> function. + <productname>PostgreSQL</productname> doesn't have facilities to automatially + resolve in-doubt transactions. These behavior might change in a future release. + </para> + </sect2> + + <sect2 id="atomic-commit-monitoring"> + <title>Monitoring</title> + <para> + The monitoring information about foreign transaction resolvers is visible in + <link linkend="pg-stat-foreign-xact-view"><literal>pg_stat_foreign_xact</literal></link> + view. This view contains one row for every foreign transaction resolver worker. + </para> + </sect2> + + <sect2> + <title>Configuration Settings</title> + + <para> + Atomic commit requires several configuration options to be set. + </para> + + <para> + On the coordinator side, <xref linkend="guc-max-prepared-foreign-transactions"/> and + <xref linkend="guc-max-foreign-transaction-resolvers"/> must be non-zero value. + Additionally the <varname>max_worker_processes</varname> may need to be adjusted to + accommodate for foreign transaction resolver workers, at least + (<varname>max_foreign_transaction_resolvers</varname> + <literal>1</literal>). + Note that some extensions and parallel queries also take worker slots from + <varname>max_worker_processes</varname>. + </para> + + </sect2> + </sect1> +</chapter> diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml index 6587678af2..dd0358ef22 100644 --- a/doc/src/sgml/fdwhandler.sgml +++ b/doc/src/sgml/fdwhandler.sgml @@ -1415,6 +1415,127 @@ ReparameterizeForeignPathByChild(PlannerInfo *root, List *fdw_private, </para> </sect2> + <sect2 id="fdw-callbacks-transaction-managements"> + <title>FDW Routines For Transaction Managements</title> + + <para> + Transaction management callbacks are used for doing commit, rollback and + prepare the foreign transaction. If an FDW wishes that its foreign + transaction is managed by <productname>PostgreSQL</productname>'s global + transaction manager it must provide both + <function>CommitForeignTransaction</function> and + <function>RollbackForeignTransaction</function>. In addition, if an FDW + wishes to support <firstterm>atomic commit</firstterm> (as described in + <xref linkend="fdw-transaction-managements"/>), it must provide + <function>PrepareForeignTransaction</function> as well and can provide + <function>GetPrepareId</function> callback optionally. + </para> + + <para> +<programlisting> +void +PrepareForeignTransaction(FdwXactRslvState *frstate); +</programlisting> + Prepare the transaction on the foreign server. This function is called at the + pre-commit phase of the local transactions if foreign twophase commit is + required. This function is used only for distribute transaction management + (see <xref linkend="distributed-transaction"/>). + </para> + + <para> + Note that this callback function is always executed by backend processes. + </para> + <para> +<programlisting> +bool +CommitForeignTransaction(FdwXactRslvState *frstate); +</programlisting> + Commit the foreign transaction. This function is called either at + the pre-commit phase of the local transaction if the transaction + can be committed in one-phase or at the post-commit phase if + two-phase commit is required. If <literal>frstate->flag</literal> has + the flag <literal>FDW_XACT_FLAG_ONEPHASE</literal> the transaction + can be committed in one-phase, this function must commit the prepared + transaction identified by <literal>frstate->fdwxact_id</literal>. + </para> + + <para> + The foreign transaction identified by <literal>frstate->fdwxact_id</literal> + might not exist on the foreign servers. This can happen when, for instance, + <productname>PostgreSQL</productname> server crashed during preparing or + committing the foreign tranasction. Therefore, this function needs to + tolerate the undefined object error + (<literal>ERRCODE_UNDEFINED_OBJECT</literal>) rather than raising an error. + </para> + + <para> + Note that all cases except for calling <function>pg_resolve_fdwxact</function> + SQL function, this callback function is executed by foreign transaction + resolver processes. + </para> + <para> +<programlisting> +bool +RollbackForeignTransaction(FdwXactRslvState *frstate); +</programlisting> + Rollback the foreign transaction. This function is called either at + the end of the local transaction after rolled back locally. The foreign + transactions are rolled back when user requested rollbacking or when + any error occurs during the transaction. This function must be tolerate to + being called recursively if any error occurs during rollback the foreign + transaction. So you would need to track recursion and prevent being called + infinitely. If <literal>frstate->flag</literal> has the flag + <literal>FDW_XACT_FLAG_ONEPHASE</literal> the transaction can be rolled + back in one-phase, otherwise this function must rollback the prepared + transaction identified by <literal>frstate->fdwxact_id</literal>. + </para> + + <para> + The foreign transaction identified by <literal>frstate->fdwxact_id</literal> + might not exist on the foreign servers. This can happen when, for instance, + <productname>PostgreSQL</productname> server crashed during preparing or + committing the foreign tranasction. Therefore, this function needs to + tolerate the undefined object error + (<literal>ERRCODE_UNDEFINED_OBJECT</literal>) rather than raising an error. + </para> + + <para> + Note that all cases except for calling <function>pg_resolve_fdwxact</function> + SQL function, this callback function is executed by foreign transaction + resolver processes. + </para> + <para> +<programlisting> +char * +GetPrepareId(TransactionId xid, Oid serverid, Oid userid, int *prep_id_len); +</programlisting> + Return null terminated string that represents prepared transaction identifier + with its length <varname>*prep_id_len</varname>. + This optional function is called during executor startup for once per the + foreign server. Note that the transaction identifier must be string literal, + less than <symbol>NAMEDATALEN</symbol> bytes long and should not be same + as any other concurrent prepared transaction id. If this callback routine + is not supported, <productname>PostgreSQL</productname>'s distributed + transaction manager generates an unique identifier with in the form of + <literal>fx_<random value up to 2<superscript>31</superscript>>_<server oid>_<user oid></literal>. + </para> + + <para> + Note that this callback function is always executed by backend processes. + </para> + + <note> + <para> + Functions <function>PrepareForeignTransaction</function>, + <function>CommitForeignTransaction</function> and + <function>RolblackForeignTransaction</function> are called + at outside of a valid transaction state. So please note that + you cannot use functions that use the system catalog cache + such as Foreign Data Wrapper helper functions described in + <xref linkend="fdw-helpers"/>. + </para> + </note> + </sect2> </sect1> <sect1 id="fdw-helpers"> @@ -1894,4 +2015,119 @@ GetForeignServerByName(const char *name, bool missing_ok); </sect1> + <sect1 id="fdw-transaction-managements"> + <title>Transaction managements for Foreign Data Wrappers</title> + <para> + If a FDW's server supports transaction, it is usually worthwhile for the + FDW to manage transaction opened on the foreign server. The FDW callback + function <literal>CommitForeignTransaction</literal>, + <literal>RollbackForeignTransaction</literal> and + <literal>PrepareForeignTransaction</literal> are used to manage Transaction + management and must fit into the working of the + <productname>PostgreSQL</productname> transaction processing. + </para> + + <para> + The information in <literal>FdwXactRslvState</literal> can be used to get + information of foreign server being processed such as server name, OID of + server, user and user mapping. The <literal>flags</literal> has contains flag + bit describing the foreign transaction state for transaction management. + </para> + + <sect2 id="fdw-transaction-commit-rollback"> + <title>Commit And Rollback Single Foreign Transaction</title> + <para> + The FDW callback function <literal>CommitForeignTransaction</literal> + and <literal>RollbackForeignTransaction</literal> can be used to commit + and rollback the foreign transaction. During transaction commit, the core + transaction manager calls <literal>CommitForeignTransaction</literal> function + in the pre-commit phase and calls + <literal>RollbackForeignTransaction</literal> function in the post-rollback + phase. + </para> + </sect2> + + <sect2 id="fdw-transaction-distributed-transaction-commit"> + <title>Atomic Commit And Rollback Distributed Transaction</title> + <para> + In addition to simply commit and rollback foreign transactions described at + <xref linkend="fdw-transaction-commit-rollback"/>, + <productname>PostgreSQL</productname> global transaction manager enables + distributed transactions to atomically commit and rollback among all foreign + servers, which is as known as atomic commit in literature. To achieve atomic + commit, <productname>PostgreSQL</productname> employs two-phase commit + protocol, which is a type of atomic commitment protocol. Every FDWs that wish + to support two-phase commit protocol are required to have the FDW callback + function <function>PrepareForeignTransaction</function> and optionally + <function>GetPrepareId</function>, in addition to + <function>CommitForeignTransaction</function> and + <function>RollbackForeignTransaction</function> + (see <xref linkend="fdw-callbacks-transaction-managements"/> for details). + </para> + + <para> + An example of distributed transaction is as follows +<programlisting> +BEGIN; +UPDATE ft1 SET col = 'a'; +UPDATE ft2 SET col = 'b'; +COMMIT; +</programlisting> + ft1 and ft2 are foreign tables on different foreign servers may be using different + Foreign Data Wrappers. + </para> + + <para> + When the core executor access the foreign servers, foreign servers whose FDW + supports transaction management callback routines is registered as a participant. + During registration, <function>GetPrepareId</function> is called if provided to + generate an unique transaction identifer. + </para> + + <para> + During pre-commit phase of local transaction, the foreign transaction manager + persists the foreign transaction information to the disk and WAL, and then + prepare all foreign transaction by calling + <function>PrepareForeignTransaction</function> if two-phase commit protocol + is required. Two-phase commit is required when the transaction modified data + on more than one servers including the local server itself and user requests + foreign twophase commit (see <xref linkend="guc-foreign-twophase-commit"/>). + </para> + + <para> + <productname>PostgreSQL</productname> can commit locally and go to the next + step if and only if all foreign transactions are prepared successfully. + If any failure happens or user requests to cancel during preparation, + the distributed transaction manager changes over rollback and calls + <function>RollbackForeignTransaction</function>. + </para> + + <para> + Note that when <literal>(frstate->flags & FDWXACT_FLAG_ONEPHASE)</literal> + is true, both <literal>CommitForeignTransaction</literal> function and + <literal>RollbackForeignTransaction</literal> function should commit and + rollback directly, rather than processing prepared transactions. This can + happen when two-phase commit is not required or foreign server is not + modified with in the transaction. + </para> + + <para> + Once all foreign transaction is prepared, the core transaction manager commits + locally. After that the transaction commit waits for all prepared foreign + transaction to be committed before completetion. After all prepared foreign + transactions are resolved the transaction commit completes. + </para> + + <para> + One foreign transaction resolver process is responsible for foreign + transaction resolution on a database. Foreign transaction resolver process + calls either <function>CommitForeignTransaction</function> or + <function>RollbackForeignTransaction</function> to resolve foreign + transaction identified by <literal>frstate->fdwxact_id</literal>. If failed + to resolve, resolver process will exit with an error message. The foreign + transaction launcher will launch the resolver process again at + <xref linkend="guc-foreign-transaction-resolution-rety-interval"/> interval. + </para> + </sect2> + </sect1> </chapter> diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index 3da2365ea9..80a87fa5d1 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -48,6 +48,7 @@ <!ENTITY wal SYSTEM "wal.sgml"> <!ENTITY logical-replication SYSTEM "logical-replication.sgml"> <!ENTITY jit SYSTEM "jit.sgml"> +<!ENTITY distributed-transaction SYSTEM "distributed-transaction.sgml"> <!-- programmer's guide --> <!ENTITY bgworker SYSTEM "bgworker.sgml"> diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 57a1539506..b9a918b9ee 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -22355,6 +22355,95 @@ SELECT (pg_stat_file('filename')).modification; </sect2> + <sect2 id="functions-foreign-transaction"> + <title>Foreign Transaction Management Functions</title> + + <indexterm> + <primary>pg_resolve_foreign_xact</primary> + </indexterm> + <indexterm> + <primary>pg_remove_foreign_xact</primary> + </indexterm> + + <para> + <xref linkend="functions-fdw-transaction-control-table"/> shows the functions + available for foreign transaction management. + These functions cannot be executed during recovery. Use of these function + is restricted to superusers. + </para> + + <table id="functions-fdw-transaction-control-table"> + <title>Foreign Transaction Management Functions</title> + <tgroup cols="3"> + <thead> + <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry></row> + </thead> + + <tbody> + <row> + <entry> + <literal><function>pg_resolve_foreign_xact(<parameter>transaction</parameter> <type>xid</type>, <parameter>userid</parameter><type>oid</type>, <parameter>userid</parameter> <type>oid</type>)</function></literal> + </entry> + <entry><type>bool</type></entry> + <entry> + Resolve a foreign transaction. This function searches for foreign + transaction matching the arguments and resolves it. Once the foreign + transaction is resolved successfully, this function removes the + corresponding entry from <xref linkend="view-pg-foreign-xacts"/>. + This function won't resolve a foreign transaction which is being + processed. + </entry> + </row> + <row> + <entry> + <literal><function>pg_remove_foreign_xact(<parameter>transaction</parameter> <type>xid</type>, <parameter>serverid</parameter><type>oid</type>, <parameter>userid</parameter> <type>oid</type>)</function></literal> + </entry> + <entry><type>void</type></entry> + <entry> + This function works the same as <function>pg_resolve_foreign_xact</function> + except that this removes the foreign transcation entry without resolution. + </entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The function shown in <xref linkend="functions-fdwxact-resolver-control-table"/> + control the foreign transaction resolvers. + </para> + + <table id="functions-fdwxact-resolver-control-table"> + <title>Foreign Transaction Resolver Control Functions</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry> + <literal><function>pg_stop_fdwxact_resolver(<parameter>dbid</parameter> <type>oid</type>)</function></literal> + </entry> + <entry><type>bool</type></entry> + <entry> + Stop the foreign transaction resolver running on the given database. + This function is useful for stopping a resolver process on the database + that you want to drop. + </entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + <function>pg_stop_fdwxact_resolver</function> is useful to be used before + dropping the database to that the foreign transaction resolver is connecting. + </para> + + </sect2> </sect1> <sect1 id="functions-trigger"> diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index a3c5f86b7e..65938e81ca 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -368,6 +368,14 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser </entry> </row> + <row> + <entry><structname>pg_stat_foreign_xact</structname><indexterm><primary>pg_stat_fdw_xact_resolver</primary></indexterm></entry> + <entry>One row per foreign transaction resolver process, showing statistics about + foreign transaction resolution. See <xref linkend="pg-stat-foreign-xact-view"/> for + details. + </entry> + </row> + </tbody> </tgroup> </table> @@ -1236,6 +1244,18 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser <entry><literal>CheckpointerMain</literal></entry> <entry>Waiting in main loop of checkpointer process.</entry> </row> + <row> + <entry><literal>FdwXactLauncherMain</literal></entry> + <entry>Waiting in main loop of foreign transaction resolution launcher process.</entry> + </row> + <row> + <entry><literal>FdwXactResolverMain</literal></entry> + <entry>Waiting in main loop of foreign transaction resolution worker process.</entry> + </row> + <row> + <entry><literal>LogicalLauncherMain</literal></entry> + <entry>Waiting in main loop of logical launcher process.</entry> + </row> <row> <entry><literal>LogicalApplyMain</literal></entry> <entry>Waiting in main loop of logical apply process.</entry> @@ -1459,6 +1479,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser <entry><literal>SyncRep</literal></entry> <entry>Waiting for confirmation from remote server during synchronous replication.</entry> </row> + <row> + <entry><literal>FdwXactResolution</literal></entry> + <entry>Waiting for all foreign transaction participants to be resolved during atomic commit among foreign servers.</entry> + </row> <row> <entry morerows="2"><literal>Timeout</literal></entry> <entry><literal>BaseBackupThrottle</literal></entry> @@ -2359,6 +2383,42 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i connection. </para> + <table id="pg-stat-foreign-xact-view" xreflabel="pg_stat_foreign_xact"> + <title><structname>pg_stat_foreign_xact</structname> View</title> + <tgroup cols="3"> + <thead> + <row> + <entry>Column</entry> + <entry>Type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry><structfield>pid</structfield></entry> + <entry><type>integer</type></entry> + <entry>Process ID of a foreign transaction resolver process</entry> + </row> + <row> + <entry><structfield>dbid</structfield></entry> + <entry><type>oid</type></entry> + <entry>OID of the database to which the foreign transaction resolver is connected</entry> + </row> + <row> + <entry><structfield>last_resolved_time</structfield></entry> + <entry><type>timestamp with time zone</type></entry> + <entry>Time at which the process last resolved a foreign transaction</entry> + </row> + </tbody> + </tgroup> + </table> + + <para> + The <structname>pg_stat_fdw_xact_resolver</structname> view will contain one + row per foreign transaction resolver process, showing state of resolution + of foreign transactions. + </para> <table id="pg-stat-archiver-view" xreflabel="pg_stat_archiver"> <title><structname>pg_stat_archiver</structname> View</title> diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml index e59cba7997..dee3f72f7e 100644 --- a/doc/src/sgml/postgres.sgml +++ b/doc/src/sgml/postgres.sgml @@ -163,6 +163,7 @@ &wal; &logical-replication; &jit; + &distributed-transaction; ®ress; </part> diff --git a/doc/src/sgml/storage.sgml b/doc/src/sgml/storage.sgml index 1c19e863d2..3f4c806ed1 100644 --- a/doc/src/sgml/storage.sgml +++ b/doc/src/sgml/storage.sgml @@ -83,6 +83,12 @@ Item subsystem</entry> </row> +<row> + <entry><filename>pg_fdwxact</filename></entry> + <entry>Subdirectory containing files used by the distributed transaction + manager subsystem</entry> +</row> + <row> <entry><filename>pg_logical</filename></entry> <entry>Subdirectory containing status data for logical decoding</entry> -- 2.23.0 From 84f81fdcb2bd823e34edba79c81c29871d7906fb Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Date: Thu, 5 Dec 2019 17:01:15 +0900 Subject: [PATCH v26 4/5] postgres_fdw supports atomic commit APIs. Original Author: Masahiko Sawada <sawada.mshk@gmail.com> --- contrib/postgres_fdw/Makefile | 7 +- contrib/postgres_fdw/connection.c | 604 +++++++++++------- .../postgres_fdw/expected/postgres_fdw.out | 265 +++++++- contrib/postgres_fdw/fdwxact.conf | 3 + contrib/postgres_fdw/postgres_fdw.c | 21 +- contrib/postgres_fdw/postgres_fdw.h | 7 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 120 +++- doc/src/sgml/postgres-fdw.sgml | 45 ++ 8 files changed, 822 insertions(+), 250 deletions(-) create mode 100644 contrib/postgres_fdw/fdwxact.conf diff --git a/contrib/postgres_fdw/Makefile b/contrib/postgres_fdw/Makefile index ee8a80a392..91fa6e39fc 100644 --- a/contrib/postgres_fdw/Makefile +++ b/contrib/postgres_fdw/Makefile @@ -16,7 +16,7 @@ SHLIB_LINK_INTERNAL = $(libpq) EXTENSION = postgres_fdw DATA = postgres_fdw--1.0.sql -REGRESS = postgres_fdw +REGRESSCHECK = postgres_fdw ifdef USE_PGXS PG_CONFIG = pg_config @@ -29,3 +29,8 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global include $(top_srcdir)/contrib/contrib-global.mk endif + +check: + $(pg_regress_check) \ + --temp-config $(top_srcdir)/contrib/postgres_fdw/fdwxact.conf \ + $(REGRESSCHECK) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 27b86a03f8..0b07e6c5cc 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * * connection.c - * Connection management functions for postgres_fdw + * Connection and transaction management functions for postgres_fdw * * Portions Copyright (c) 2012-2019, PostgreSQL Global Development Group * @@ -12,6 +12,7 @@ */ #include "postgres.h" +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_user_mapping.h" @@ -54,6 +55,7 @@ typedef struct ConnCacheEntry bool have_error; /* have any subxacts aborted in this xact? */ bool changing_xact_state; /* xact state change in process */ bool invalidated; /* true if reconnect is pending */ + bool xact_got_connection; uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; @@ -67,17 +69,13 @@ static HTAB *ConnectionHash = NULL; static unsigned int cursor_number = 0; static unsigned int prep_stmt_number = 0; -/* tracks whether any work is needed in callback functions */ -static bool xact_got_connection = false; - /* prototypes of private functions */ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); -static void begin_remote_xact(ConnCacheEntry *entry); -static void pgfdw_xact_callback(XactEvent event, void *arg); +static void begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, @@ -89,24 +87,26 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); - +static void pgfdw_end_prepared_xact(ConnCacheEntry *entry, char *fdwxact_id, + bool is_commit); +static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry); +static ConnCacheEntry *GetConnectionState(Oid umid, bool will_prep_stmt, + bool start_transaction); +static ConnCacheEntry *GetConnectionCacheEntry(Oid umid); /* - * Get a PGconn which can be used to execute queries on the remote PostgreSQL - * server with the user's authorization. A new connection is established - * if we don't already have a suitable one, and a transaction is opened at - * the right subtransaction nesting depth if we didn't do that already. - * - * will_prep_stmt must be true if caller intends to create any prepared - * statements. Since those don't go away automatically at transaction end - * (not even on error), we need this flag to cue manual cleanup. + * Get connection cache entry. Unlike GetConenctionState function, this function + * doesn't establish new connection even if not yet. */ -PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +static ConnCacheEntry * +GetConnectionCacheEntry(Oid umid) { - bool found; ConnCacheEntry *entry; - ConnCacheKey key; + ConnCacheKey key; + bool found; + + /* Create hash key for the entry. Assume no pad bytes in key struct */ + key = umid; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -126,7 +126,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt) * Register some callback functions that manage connection cleanup. * This should be done just once in each backend. */ - RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL); CacheRegisterSyscacheCallback(FOREIGNSERVEROID, pgfdw_inval_callback, (Datum) 0); @@ -134,12 +133,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt) pgfdw_inval_callback, (Datum) 0); } - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - - /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; - /* * Find or create cached entry for requested connection. */ @@ -153,6 +146,21 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->conn = NULL; } + return entry; +} + +/* + * This function gets the connection cache entry and establishes connection + * to the foreign server if there is no connection and starts a new transaction + * if 'start_transaction' is true. + */ +static ConnCacheEntry * +GetConnectionState(Oid umid, bool will_prep_stmt, bool start_transaction) +{ + ConnCacheEntry *entry; + + entry = GetConnectionCacheEntry(umid); + /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); @@ -180,6 +188,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) */ if (entry->conn == NULL) { + UserMapping *user = GetUserMappingByOid(umid); ForeignServer *server = GetForeignServer(user->serverid); /* Reset all transient state fields, to be sure all are clean */ @@ -188,6 +197,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->have_error = false; entry->changing_xact_state = false; entry->invalidated = false; + entry->xact_got_connection = false; entry->server_hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(server->serverid)); @@ -198,6 +208,15 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); + Assert(entry->conn); + + if (!entry->conn) + { + elog(DEBUG3, "attempt to connection to server \"%s\" by postgres_fdw failed", + server->servername); + return NULL; + } + elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); } @@ -205,11 +224,39 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Start a new transaction or subtransaction if needed. */ - begin_remote_xact(entry); + if (start_transaction) + { + UserMapping *user = GetUserMappingByOid(umid); + + begin_remote_xact(entry, user->serverid, user->userid); + + /* Set flag that we did GetConnection during the current transaction */ + entry->xact_got_connection = true; + } /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + return entry; +} + +/* + * Get a PGconn which can be used to execute queries on the remote PostgreSQL + * server with the user's authorization. A new connection is established + * if we don't already have a suitable one, and a transaction is opened at + * the right subtransaction nesting depth if we didn't do that already. + * + * will_prep_stmt must be true if caller intends to create any prepared + * statements. Since those don't go away automatically at transaction end + * (not even on error), we need this flag to cue manual cleanup. + */ +PGconn * +GetConnection(Oid umid, bool will_prep_stmt, bool start_transaction) +{ + ConnCacheEntry *entry; + + entry = GetConnectionState(umid, will_prep_stmt, start_transaction); + return entry->conn; } @@ -412,7 +459,7 @@ do_sql_command(PGconn *conn, const char *sql) * control which remote queries share a snapshot. */ static void -begin_remote_xact(ConnCacheEntry *entry) +begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid) { int curlevel = GetCurrentTransactionNestLevel(); @@ -639,193 +686,6 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, PG_END_TRY(); } -/* - * pgfdw_xact_callback --- cleanup at main-transaction end. - */ -static void -pgfdw_xact_callback(XactEvent event, void *arg) -{ - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; - - /* - * Scan all connection cache entries to find open remote transactions, and - * close them. - */ - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) - { - PGresult *res; - - /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) - continue; - - /* If it has an open remote transaction, try to close it */ - if (entry->xact_depth > 0) - { - bool abort_cleanup_failure = false; - - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); - - switch (event) - { - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - - /* - * If abort cleanup previously failed for this connection, - * we can't issue any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); - - /* Commit all remote transactions during pre-commit */ - entry->changing_xact_state = true; - do_sql_command(entry->conn, "COMMIT TRANSACTION"); - entry->changing_xact_state = false; - - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; - break; - case XACT_EVENT_PRE_PREPARE: - - /* - * We disallow any remote transactions, since it's not - * very reasonable to hold them open until the prepared - * transaction is committed. For the moment, throw error - * unconditionally; later we might allow read-only cases. - * Note that the error will cause us to come right back - * here with event == XACT_EVENT_ABORT, so we'll clean up - * the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - - /* - * Don't try to clean up the connection if we're already - * in error recursion trouble. - */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; - - /* - * If connection is already unsalvageable, don't touch it - * further. - */ - if (entry->changing_xact_state) - break; - - /* - * Mark this connection as in the process of changing - * transaction state. - */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - { - /* Unable to cancel running query. */ - abort_cleanup_failure = true; - } - else if (!pgfdw_exec_cleanup_query(entry->conn, - "ABORT TRANSACTION", - false)) - { - /* Unable to abort remote transaction. */ - abort_cleanup_failure = true; - } - else if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - { - /* Trouble clearing prepared statements. */ - abort_cleanup_failure = true; - } - else - { - entry->have_prep_stmt = false; - entry->have_error = false; - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; - break; - } - } - - /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; - - /* - * If the connection isn't in a good idle state, discard it to - * recover. Next GetConnection will open a new connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); - } - } - - /* - * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) - */ - xact_got_connection = false; - - /* Also reset cursor numbering for next transaction */ - cursor_number = 0; -} - /* * pgfdw_subxact_callback --- cleanup at subtransaction end. */ @@ -842,10 +702,6 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, event == SUBXACT_EVENT_ABORT_SUB)) return; - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; - /* * Scan all connection cache entries to find open remote subtransactions * of the current level, and close them. @@ -856,6 +712,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, { char sql[100]; + /* Quick exit if no connections were touched in this transaction. */ + if (!entry->xact_got_connection) + continue; + /* * We only care about connections with open remote subtransactions of * the current level. @@ -1190,3 +1050,309 @@ exit: ; *result = last_res; return timed_out; } + +/* + * Prepare a transaction on foreign server. + */ +void +postgresPrepareForeignTransaction(FdwXactRslvState *state) +{ + ConnCacheEntry *entry = NULL; + PGresult *res; + StringInfo command; + + /* The transaction should have started already get the cache entry */ + entry = GetConnectionCacheEntry(state->usermapping->umid); + + /* The transaction should have been started */ + Assert(entry->xact_got_connection && entry->conn); + + pgfdw_reject_incomplete_xact_state_change(entry); + + command = makeStringInfo(); + appendStringInfo(command, "PREPARE TRANSACTION '%s'", state->fdwxact_id); + + /* Do commit foreign transaction */ + entry->changing_xact_state = true; + res = pgfdw_exec_query(entry->conn, command->data); + entry->changing_xact_state = false; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, (errmsg("could not prepare transaction on server %s with ID %s", + state->server->servername, state->fdwxact_id))); + + elog(DEBUG1, "prepared foreign transaction on server %s with ID %s", + state->server->servername, state->fdwxact_id); + + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + pgfdw_cleanup_after_transaction(entry); +} + +/* + * Commit a transaction or a prepared transaction on foreign server. If + * state->flags contains FDWXACT_FLAG_ONEPHASE this function can commit the + * foreign transaction without preparation, otherwise commit the prepared + * transaction. + */ +void +postgresCommitForeignTransaction(FdwXactRslvState *state) +{ + ConnCacheEntry *entry = NULL; + bool is_onephase = (state->flags & FDWXACT_FLAG_ONEPHASE) != 0; + PGresult *res; + + if (!is_onephase) + { + /* + * In two-phase commit case, the foreign transaction has prepared and + * closed, so we might not have a connection to it. We get a connection + * but don't start transaction. + */ + entry = GetConnectionState(state->usermapping->umid, false, false); + + /* COMMIT PREPARED the transaction */ + pgfdw_end_prepared_xact(entry, state->fdwxact_id, true); + return; + } + + /* + * In simple commit case, we must have a connection to the foreign server + * because the foreign transaction is not closed yet. We get the connection + * entry from the cache. + */ + entry = GetConnectionCacheEntry(state->usermapping->umid); + Assert(entry); + + if (!entry->conn || !entry->xact_got_connection) + return; + + /* + * If abort cleanup previously failed for this connection, we can't issue + * any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + + entry->changing_xact_state = true; + res = pgfdw_exec_query(entry->conn, "COMMIT TRANSACTION"); + entry->changing_xact_state = false; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, (errmsg("could not commit transaction on server %s", + state->server->servername))); + + /* + * If there were any errors in subtransactions, and we ma + * made prepared statements, do a DEALLOCATE ALL to make + * sure we get rid of all prepared statements. This is + * annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this + * constrains how old a server postgres_fdw can + * communicate with. We intentionally ignore errors in + * the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements + * as we go; but we don't really support update operations + * pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); +} + +/* + * Rollback a transaction on foreign server. As with commit case, if state->flags + * contains FDWAXCT_FLAG_ONEPHASE this function can rollback the foreign + * transaction without preparation, other wise rollback the prepared transaction. + * This function must tolerate to being called recusively as an error can happen + * during aborting. + */ +void +postgresRollbackForeignTransaction(FdwXactRslvState *state) +{ + bool is_onephase = (state->flags & FDWXACT_FLAG_ONEPHASE) != 0; + ConnCacheEntry *entry = NULL; + bool abort_cleanup_failure = false; + + if (!is_onephase) + { + /* + * In two-phase commit case, the foreign transaction has prepared and + * closed, so we might not have a connection to it. We get a connection + * but don't start transaction. + */ + entry = GetConnectionState(state->usermapping->umid, false, false); + + /* ROLLBACK PREPARED the transaction */ + pgfdw_end_prepared_xact(entry, state->fdwxact_id, false); + return; + } + + /* + * In simple rollback case, we must have a connection to the foreign server + * because the foreign transaction is not closed yet. We get the connection + * entry from the cache. + */ + entry = GetConnectionCacheEntry(state->usermapping->umid); + Assert(entry); + + /* + * Cleanup connection entry transaction if transaction fails before + * establishing a connection or starting transaction. + */ + if (!entry->conn || !entry->xact_got_connection) + { + pgfdw_cleanup_after_transaction(entry); + return; + } + + /* + * Don't try to clean up the connection if we're already + * in error recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* + * If connection is before starting transaction or is already unsalvageable, + * do only the cleanup and don't touch it further. + */ + if (entry->changing_xact_state || !entry->xact_got_connection) + { + pgfdw_cleanup_after_transaction(entry); + return; + } + + /* + * Mark this connection as in the process of changing + * transaction state. + */ + entry->changing_xact_state = true; + + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; + + /* + * If a command has been submitted to the remote server by + * using an asynchronous execution function, the command + * might not have yet completed. Check to see if a + * command is still being processed by the remote server, + * and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + { + /* Unable to cancel running query. */ + abort_cleanup_failure = true; + } + else if (!pgfdw_exec_cleanup_query(entry->conn, + "ABORT TRANSACTION", + false)) + { + /* Unable to abort remote transaction. */ + abort_cleanup_failure = true; + } + else if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + { + /* Trouble clearing prepared statements. */ + abort_cleanup_failure = true; + } + + /* Disarm changing_xact_state if it all worked. */ + entry->changing_xact_state = abort_cleanup_failure; + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); + + return; +} + +/* + * Commit or rollback prepared transaction on the foreign server. + */ +static void +pgfdw_end_prepared_xact(ConnCacheEntry *entry, char *fdwxact_id, bool is_commit) +{ + StringInfo command; + PGresult *res; + + command = makeStringInfo(); + appendStringInfo(command, "%s PREPARED '%s'", + is_commit ? "COMMIT" : "ROLLBACK", + fdwxact_id); + + res = pgfdw_exec_query(entry->conn, command->data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + int sqlstate; + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + if (diag_sqlstate) + { + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + } + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * As core global transaction manager states, it's possible that the + * given foreign transaction doesn't exist on the foreign server. So + * we should accept an UNDEFINED_OBJECT error. + */ + if (sqlstate != ERRCODE_UNDEFINED_OBJECT) + pgfdw_report_error(ERROR, res, entry->conn, false, command->data); + } + + elog(DEBUG1, "%s prepared foreign transaction with ID %s", + is_commit ? "commit" : "rollback", + fdwxact_id); + + /* Cleanup transaction status */ + pgfdw_cleanup_after_transaction(entry); +} + +/* Cleanup at main-transaction end */ +static void +pgfdw_cleanup_after_transaction(ConnCacheEntry *entry) +{ + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + entry->have_prep_stmt = false; + entry->have_error = false; + entry->xact_got_connection = false; + + /* + * If the connection isn't in a good idle state, discard it to + * recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE || + entry->changing_xact_state) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + disconnect_pg_server(entry); + } + + entry->changing_xact_state = false; + + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 48282ab151..0ee91a49ac 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -13,12 +13,17 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3; -- =================================================================== -- create objects used through FDW loopback server -- =================================================================== @@ -52,6 +57,13 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5" ( + c1 int NOT NULL +); +CREATE TABLE "S 1"."T 6" ( + c1 int NOT NULL, + CONSTRAINT t6_pkey PRIMARY KEY (c1) +); -- Disable autovacuum for these tables to avoid unexpected effects of that ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false'); ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false'); @@ -87,6 +99,7 @@ ANALYZE "S 1"."T 1"; ANALYZE "S 1"."T 2"; ANALYZE "S 1"."T 3"; ANALYZE "S 1"."T 4"; +ANALYZE "S 1"."T 5"; -- =================================================================== -- create foreign tables -- =================================================================== @@ -129,6 +142,12 @@ CREATE FOREIGN TABLE ft6 ( c2 int NOT NULL, c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft7_2pc ( + c1 int NOT NULL +) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5'); +CREATE FOREIGN TABLE ft8_2pc ( + c1 int NOT NULL +) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5'); -- =================================================================== -- tests for validator -- =================================================================== @@ -179,15 +198,17 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1'); ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ - List of foreign tables - Schema | Table | Server | FDW options | Description ---------+-------+-----------+---------------------------------------+------------- - public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | - public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | - public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | -(5 rows) + List of foreign tables + Schema | Table | Server | FDW options | Description +--------+---------+-----------+---------------------------------------+------------- + public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | + public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | + public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | + public | ft7_2pc | loopback | (schema_name 'S 1', table_name 'T 5') | + public | ft8_2pc | loopback2 | (schema_name 'S 1', table_name 'T 5') | +(7 rows) -- Test that alteration of server options causes reconnection -- Remote's errors might be non-English, so hide them to ensure stable results @@ -8781,16 +8802,226 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 -- Clean-up RESET enable_partitionwise_aggregate; --- Two-phase transactions are not supported. + +-- =================================================================== +-- test distributed atomic commit across foreign servers +-- =================================================================== +-- Enable atomic commit +SET foreign_twophase_commit TO 'required'; +-- Modify single foreign server and then commit and rollback. +BEGIN; +INSERT INTO ft7_2pc VALUES(1); +COMMIT; +SELECT * FROM ft7_2pc; + c1 +---- + 1 +(1 row) + +BEGIN; +INSERT INTO ft7_2pc VALUES(1); +ROLLBACK; +SELECT * FROM ft7_2pc; + c1 +---- + 1 +(1 row) + +-- Modify two servers then commit and rollback. This requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +COMMIT; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 +(3 rows) + +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +ROLLBACK; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 +(3 rows) + +-- Modify both local data and 2PC-capable server then commit and rollback. +-- This also requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +COMMIT; +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 +(4 rows) + +SELECT * FROM "S 1"."T 6"; + c1 +---- + 3 +(1 row) + BEGIN; -SELECT count(*) FROM ft1; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +ERROR: duplicate key value violates unique constraint "t6_pkey" +DETAIL: Key (c1)=(3) already exists. +ROLLBACK; +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 +(4 rows) + +SELECT * FROM "S 1"."T 6"; + c1 +---- + 3 +(1 row) + +-- Modify foreign server and raise an error. No data changed. +BEGIN; +INSERT INTO ft7_2pc VALUES(4); +INSERT INTO ft8_2pc VALUES(NULL); -- violation +ERROR: null value in column "c1" violates not-null constraint +DETAIL: Failing row contains (null). +CONTEXT: remote SQL command: INSERT INTO "S 1"."T 5"(c1) VALUES ($1) +ROLLBACK; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 +(4 rows) + +BEGIN; +INSERT INTO ft7_2pc VALUES (5); +INSERT INTO ft8_2pc VALUES (5); +SAVEPOINT S1; +INSERT INTO ft7_2pc VALUES (6); +INSERT INTO ft8_2pc VALUES (6); +ROLLBACK TO S1; +COMMIT; +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 +(6 rows) + +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 +(6 rows) + +RELEASE SAVEPOINT S1; +ERROR: RELEASE SAVEPOINT can only be used in transaction blocks +-- When set to 'disabled', we can commit it +SET foreign_twophase_commit TO 'disabled'; +BEGIN; +INSERT INTO ft7_2pc VALUES(8); +INSERT INTO ft8_2pc VALUES(8); +COMMIT; -- success +SELECT * FROM ft7_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 +(8 rows) + +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 +(8 rows) + +SET foreign_twophase_commit TO 'required'; +-- Commit and rollback foreign transactions that are part of +-- prepare transaction. +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +COMMIT PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 + 9 + 9 +(10 rows) + +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +ROLLBACK PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + c1 +---- + 1 + 2 + 2 + 3 + 5 + 5 + 8 + 8 + 9 + 9 +(10 rows) + +-- No entry remained +SELECT count(*) FROM pg_foreign_xacts; count ------- - 822 + 0 (1 row) --- error here -PREPARE TRANSACTION 'fdw_tpc'; -ERROR: cannot PREPARE a transaction that has operated on postgres_fdw foreign tables -ROLLBACK; -WARNING: there is no transaction in progress diff --git a/contrib/postgres_fdw/fdwxact.conf b/contrib/postgres_fdw/fdwxact.conf new file mode 100644 index 0000000000..3fdbf93cdb --- /dev/null +++ b/contrib/postgres_fdw/fdwxact.conf @@ -0,0 +1,3 @@ +max_prepared_transactions = 3 +max_prepared_foreign_transactions = 3 +max_foreign_transaction_resolvers = 2 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index bdc21b36d1..9c63f0aa3b 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -14,6 +14,7 @@ #include <limits.h> +#include "access/fdwxact.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "access/table.h" @@ -504,7 +505,6 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); - /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. @@ -558,6 +558,11 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for foreign transactions */ + routine->PrepareForeignTransaction = postgresPrepareForeignTransaction; + routine->CommitForeignTransaction = postgresCommitForeignTransaction; + routine->RollbackForeignTransaction = postgresRollbackForeignTransaction; + PG_RETURN_POINTER(routine); } @@ -1434,7 +1439,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user->umid, false, true); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -2372,7 +2377,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user->umid, false, true); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2746,7 +2751,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user->umid, false, true); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3566,7 +3571,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user->umid, true, true); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4441,7 +4446,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user->umid, false, true); /* * Construct command to get page count for relation. @@ -4527,7 +4532,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user->umid, false, true); /* * Construct cursor that retrieves whole rows from remote. @@ -4755,7 +4760,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping->umid, false, true); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index ea052872c3..d7ba45c8d2 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -13,6 +13,7 @@ #ifndef POSTGRES_FDW_H #define POSTGRES_FDW_H +#include "access/fdwxact.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" @@ -129,7 +130,7 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(Oid umid, bool will_prep_stmt, bool start_transaction); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); @@ -137,6 +138,9 @@ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern void postgresPrepareForeignTransaction(FdwXactRslvState *state); +extern void postgresCommitForeignTransaction(FdwXactRslvState *state); +extern void postgresRollbackForeignTransaction(FdwXactRslvState *state); /* in option.c */ extern int ExtractConnectionOptions(List *defelems, @@ -203,6 +207,7 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, bool is_subquery, List **retrieved_attrs, List **params_list); extern const char *get_jointype_name(JoinType jointype); +extern bool server_uses_twophase_commit(ForeignServer *server); /* in shippable.c */ extern bool is_builtin(Oid objectId); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 1c5c37b783..572077c57c 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -15,6 +15,10 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; @@ -22,6 +26,7 @@ CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3; -- =================================================================== -- create objects used through FDW loopback server @@ -56,6 +61,14 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5" ( + c1 int NOT NULL +); + +CREATE TABLE "S 1"."T 6" ( + c1 int NOT NULL, + CONSTRAINT t6_pkey PRIMARY KEY (c1) +); -- Disable autovacuum for these tables to avoid unexpected effects of that ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false'); @@ -94,6 +107,7 @@ ANALYZE "S 1"."T 1"; ANALYZE "S 1"."T 2"; ANALYZE "S 1"."T 3"; ANALYZE "S 1"."T 4"; +ANALYZE "S 1"."T 5"; -- =================================================================== -- create foreign tables @@ -142,6 +156,15 @@ CREATE FOREIGN TABLE ft6 ( c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft7_2pc ( + c1 int NOT NULL +) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5'); + +CREATE FOREIGN TABLE ft8_2pc ( + c1 int NOT NULL +) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5'); + + -- =================================================================== -- tests for validator -- =================================================================== @@ -2480,9 +2503,98 @@ SELECT b, avg(a), max(a), count(*) FROM pagg_tab GROUP BY b HAVING sum(a) < 700 -- Clean-up RESET enable_partitionwise_aggregate; --- Two-phase transactions are not supported. +-- =================================================================== +-- test distributed atomic commit across foreign servers +-- =================================================================== + +-- Enable atomic commit +SET foreign_twophase_commit TO 'required'; + +-- Modify single foreign server and then commit and rollback. +BEGIN; +INSERT INTO ft7_2pc VALUES(1); +COMMIT; +SELECT * FROM ft7_2pc; + BEGIN; -SELECT count(*) FROM ft1; --- error here -PREPARE TRANSACTION 'fdw_tpc'; +INSERT INTO ft7_2pc VALUES(1); ROLLBACK; +SELECT * FROM ft7_2pc; + +-- Modify two servers then commit and rollback. This requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +COMMIT; +SELECT * FROM ft8_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES(2); +INSERT INTO ft8_2pc VALUES(2); +ROLLBACK; +SELECT * FROM ft8_2pc; + +-- Modify both local data and 2PC-capable server then commit and rollback. +-- This also requires to use 2PC. +BEGIN; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +COMMIT; +SELECT * FROM ft7_2pc; +SELECT * FROM "S 1"."T 6"; + +BEGIN; +INSERT INTO ft7_2pc VALUES(3); +INSERT INTO "S 1"."T 6" VALUES (3); +ROLLBACK; +SELECT * FROM ft7_2pc; +SELECT * FROM "S 1"."T 6"; + +-- Modify foreign server and raise an error. No data changed. +BEGIN; +INSERT INTO ft7_2pc VALUES(4); +INSERT INTO ft8_2pc VALUES(NULL); -- violation +ROLLBACK; +SELECT * FROM ft8_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES (5); +INSERT INTO ft8_2pc VALUES (5); +SAVEPOINT S1; +INSERT INTO ft7_2pc VALUES (6); +INSERT INTO ft8_2pc VALUES (6); +ROLLBACK TO S1; +COMMIT; +SELECT * FROM ft7_2pc; +SELECT * FROM ft8_2pc; +RELEASE SAVEPOINT S1; + +-- When set to 'disabled', we can commit it +SET foreign_twophase_commit TO 'disabled'; +BEGIN; +INSERT INTO ft7_2pc VALUES(8); +INSERT INTO ft8_2pc VALUES(8); +COMMIT; -- success +SELECT * FROM ft7_2pc; +SELECT * FROM ft8_2pc; + +SET foreign_twophase_commit TO 'required'; + +-- Commit and rollback foreign transactions that are part of +-- prepare transaction. +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +COMMIT PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + +BEGIN; +INSERT INTO ft7_2pc VALUES(9); +INSERT INTO ft8_2pc VALUES(9); +PREPARE TRANSACTION 'gx1'; +ROLLBACK PREPARED 'gx1'; +SELECT * FROM ft8_2pc; + +-- No entry remained +SELECT count(*) FROM pg_foreign_xacts; diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index 1d4bafd9f0..362f7be9e3 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -441,6 +441,43 @@ </para> </sect3> + + <sect3> + <title>Transaction Management Options</title> + + <para> + By default, if the transaction involves with multiple remote server, + each transaction on remote server is committed or aborted independently. + Some of transactions may fail to commit on remote server while other + transactions commit successfully. This may be overridden using + following option: + </para> + + <variablelist> + + <varlistentry> + <term><literal>two_phase_commit</literal></term> + <listitem> + <para> + This option controls whether <filename>postgres_fdw</filename> allows + to use two-phase-commit when transaction commits. This option can + only be specified for foreign servers, not per-table. + The default is <literal>false</literal>. + </para> + + <para> + If this option is enabled, <filename>postgres_fdw</filename> prepares + transaction on remote server and <productname>PostgreSQL</productname> + keeps track of the distributed transaction. + <xref linkend="guc-max-prepared-foreign-transactions"/> must be set more + than 1 on local server and <xref linkend="guc-max-prepared-transactions"/> + must set to more than 1 on remote server. + </para> + </listitem> + </varlistentry> + + </variablelist> + </sect3> </sect2> <sect2> @@ -468,6 +505,14 @@ managed by creating corresponding remote savepoints. </para> + <para> + <filename>postgrs_fdw</filename> uses two-phase commit protocol during + transaction commits or aborts when the atomic commit of distributed + transaction (see <xref linkend="atomic-commit"/>) is required. So the remote + server should set <xref linkend="guc-max-prepared-transactions"/> more + than one so that it can prepare the remote transaction. + </para> + <para> The remote transaction uses <literal>SERIALIZABLE</literal> isolation level when the local transaction has <literal>SERIALIZABLE</literal> -- 2.23.0 From 639d9156323594430ec4b2217a95bfcf08195e9d Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Date: Thu, 5 Dec 2019 17:01:26 +0900 Subject: [PATCH v26 5/5] Add regression tests for atomic commit. Original Author: Masahiko Sawada <sawada.mshk@gmail.com> --- src/test/recovery/Makefile | 2 +- src/test/recovery/t/016_fdwxact.pl | 175 +++++++++++++++++++++++++++++ src/test/regress/pg_regress.c | 13 ++- 3 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 src/test/recovery/t/016_fdwxact.pl diff --git a/src/test/recovery/Makefile b/src/test/recovery/Makefile index e66e69521f..b17429f501 100644 --- a/src/test/recovery/Makefile +++ b/src/test/recovery/Makefile @@ -9,7 +9,7 @@ # #------------------------------------------------------------------------- -EXTRA_INSTALL=contrib/test_decoding +EXTRA_INSTALL=contrib/test_decoding contrib/pageinspect contrib/postgres_fdw subdir = src/test/recovery top_builddir = ../../.. diff --git a/src/test/recovery/t/016_fdwxact.pl b/src/test/recovery/t/016_fdwxact.pl new file mode 100644 index 0000000000..9af9bb81dc --- /dev/null +++ b/src/test/recovery/t/016_fdwxact.pl @@ -0,0 +1,175 @@ +# Tests for transaction involving foreign servers +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 7; + +# Setup master node +my $node_master = get_new_node("master"); +my $node_standby = get_new_node("standby"); + +$node_master->init(allows_streaming => 1); +$node_master->append_conf('postgresql.conf', qq( +max_prepared_transactions = 10 +max_prepared_foreign_transactions = 10 +max_foreign_transaction_resolvers = 2 +foreign_transaction_resolver_timeout = 0 +foreign_transaction_resolution_retry_interval = 5s +foreign_twophase_commit = on +)); +$node_master->start; + +# Take backup from master node +my $backup_name = 'master_backup'; +$node_master->backup($backup_name); + +# Set up standby node +$node_standby->init_from_backup($node_master, $backup_name, + has_streaming => 1); +$node_standby->start; + +# Set up foreign nodes +my $node_fs1 = get_new_node("fs1"); +my $node_fs2 = get_new_node("fs2"); +my $fs1_port = $node_fs1->port; +my $fs2_port = $node_fs2->port; +$node_fs1->init; +$node_fs2->init; +$node_fs1->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_fs2->append_conf('postgresql.conf', qq(max_prepared_transactions = 10)); +$node_fs1->start; +$node_fs2->start; + +# Create foreign servers on the master node +$node_master->safe_psql('postgres', qq( +CREATE EXTENSION postgres_fdw +)); +$node_master->safe_psql('postgres', qq( +CREATE SERVER fs1 FOREIGN DATA WRAPPER postgres_fdw +OPTIONS (dbname 'postgres', port '$fs1_port'); +)); +$node_master->safe_psql('postgres', qq( +CREATE SERVER fs2 FOREIGN DATA WRAPPER postgres_fdw +OPTIONS (dbname 'postgres', port '$fs2_port'); +)); + +# Create user mapping on the master node +$node_master->safe_psql('postgres', qq( +CREATE USER MAPPING FOR CURRENT_USER SERVER fs1; +CREATE USER MAPPING FOR CURRENT_USER SERVER fs2; +)); + +# Create tables on foreign nodes and import them to the master node +$node_fs1->safe_psql('postgres', qq( +CREATE SCHEMA fs; +CREATE TABLE fs.t1 (c int); +)); +$node_fs2->safe_psql('postgres', qq( +CREATE SCHEMA fs; +CREATE TABLE fs.t2 (c int); +)); +$node_master->safe_psql('postgres', qq( +IMPORT FOREIGN SCHEMA fs FROM SERVER fs1 INTO public; +IMPORT FOREIGN SCHEMA fs FROM SERVER fs2 INTO public; +CREATE TABLE l_table (c int); +)); + +# Switch to synchronous replication +$node_master->safe_psql('postgres', qq( +ALTER SYSTEM SET synchronous_standby_names ='*'; +)); +$node_master->reload; + +my $result; + +# Prepare two transactions involving multiple foreign servers and shutdown +# the master node. Check if we can commit and rollback the foreign transactions +# after the normal recovery. +$node_master->safe_psql('postgres', qq( +BEGIN; +INSERT INTO t1 VALUES (1); +INSERT INTO t2 VALUES (1); +PREPARE TRANSACTION 'gxid1'; +BEGIN; +INSERT INTO t1 VALUES (2); +INSERT INTO t2 VALUES (2); +PREPARE TRANSACTION 'gxid2'; +)); + +$node_master->stop; +$node_master->start; + +# Commit and rollback foreign transactions after the recovery. +$result = $node_master->psql('postgres', qq(COMMIT PREPARED 'gxid1')); +is($result, 0, 'Commit foreign transactions after recovery'); +$result = $node_master->psql('postgres', qq(ROLLBACK PREPARED 'gxid2')); +is($result, 0, 'Rollback foreign transactions after recovery'); + +# +# Prepare two transactions involving multiple foreign servers and shutdown +# the master node immediately. Check if we can commit and rollback the foreign +# transactions after the crash recovery. +# +$node_master->safe_psql('postgres', qq( +BEGIN; +INSERT INTO t1 VALUES (3); +INSERT INTO t2 VALUES (3); +PREPARE TRANSACTION 'gxid1'; +BEGIN; +INSERT INTO t1 VALUES (4); +INSERT INTO t2 VALUES (4); +PREPARE TRANSACTION 'gxid2'; +)); + +$node_master->teardown_node; +$node_master->start; + +# Commit and rollback foreign transactions after the crash recovery. +$result = $node_master->psql('postgres', qq(COMMIT PREPARED 'gxid1')); +is($result, 0, 'Commit foreign transactions after crash recovery'); +$result = $node_master->psql('postgres', qq(ROLLBACK PREPARED 'gxid2')); +is($result, 0, 'Rollback foreign transactions after crash recovery'); + +# +# Commit transaction involving foreign servers and shutdown the master node +# immediately before checkpoint. Check that WAL replay cleans up +# its shared memory state release locks while replaying transaction commit. +# +$node_master->safe_psql('postgres', qq( +BEGIN; +INSERT INTO t1 VALUES (5); +INSERT INTO t2 VALUES (5); +COMMIT; +)); + +$node_master->teardown_node; +$node_master->start; + +$result = $node_master->safe_psql('postgres', qq( +SELECT count(*) FROM pg_foreign_xacts; +)); +is($result, 0, "Cleanup of shared memory state for foreign transactions"); + +# +# Check if the standby node can process prepared foreign transaction +# after promotion. +# +$node_master->safe_psql('postgres', qq( +BEGIN; +INSERT INTO t1 VALUES (6); +INSERT INTO t2 VALUES (6); +PREPARE TRANSACTION 'gxid1'; +BEGIN; +INSERT INTO t1 VALUES (7); +INSERT INTO t2 VALUES (7); +PREPARE TRANSACTION 'gxid2'; +)); + +$node_master->teardown_node; +$node_standby->promote; + +$result = $node_standby->psql('postgres', qq(COMMIT PREPARED 'gxid1';)); +is($result, 0, 'Commit foreign transaction after promotion'); +$result = $node_standby->psql('postgres', qq(ROLLBACK PREPARED 'gxid2';)); +is($result, 0, 'Rollback foreign transaction after promotion'); diff --git a/src/test/regress/pg_regress.c b/src/test/regress/pg_regress.c index 297b8fbd6f..82a1e7d541 100644 --- a/src/test/regress/pg_regress.c +++ b/src/test/regress/pg_regress.c @@ -2336,9 +2336,12 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc * Adjust the default postgresql.conf for regression testing. The user * can specify a file to be appended; in any case we expand logging * and set max_prepared_transactions to enable testing of prepared - * xacts. (Note: to reduce the probability of unexpected shmmax - * failures, don't set max_prepared_transactions any higher than - * actually needed by the prepared_xacts regression test.) + * xacts. We also set max_prepared_foreign_transactions and + * max_foreign_transaction_resolvers to enable testing of transaction + * involving multiple foreign servers. (Note: to reduce the probability + * of unexpected shmmax failures, don't set max_prepared_transactions + * any higher than actually needed by the prepared_xacts regression + * test.) */ snprintf(buf, sizeof(buf), "%s/data/postgresql.conf", temp_instance); pg_conf = fopen(buf, "a"); @@ -2353,7 +2356,9 @@ regression_main(int argc, char *argv[], init_function ifunc, test_function tfunc fputs("log_line_prefix = '%m [%p] %q%a '\n", pg_conf); fputs("log_lock_waits = on\n", pg_conf); fputs("log_temp_files = 128kB\n", pg_conf); - fputs("max_prepared_transactions = 2\n", pg_conf); + fputs("max_prepared_transactions = 3\n", pg_conf); + fputs("max_prepared_foreign_transactions = 2\n", pg_conf); + fputs("max_foreign_transaction_resolvers = 2\n", pg_conf); for (sl = temp_configs; sl != NULL; sl = sl->next) { -- 2.23.0
pgsql-hackers by date: