From 9353ea0a43627d2e9ec62d40e566c6520cf26a89 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 28 Aug 2020 22:25:38 +0900 Subject: [PATCH v34 01/11] Introduce transaction manager for foreign transactions. The global transaciton manager manages the transactions initiated on the foreign server. This commit also adds both CommitForeignTransaction and RollbackForeignTransaction FDW APIs supporing only one-phase commit. FDW that implements these APIs can be managed by the global transaciton manager. So FDW is able to control its transaction using the foreign transaction manager, not using XactCallback. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/fdwxact.c | 237 +++++++++++++++++++++++++++ src/backend/access/transam/xact.c | 14 ++ src/backend/foreign/foreign.c | 4 + src/include/access/fdwxact.h | 33 ++++ src/include/foreign/fdwapi.h | 12 ++ 6 files changed, 301 insertions(+) create mode 100644 src/backend/access/transam/fdwxact.c create mode 100644 src/include/access/fdwxact.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..b05a88549d 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ clog.o \ commit_ts.o \ + fdwxact.o \ generic_xlog.o \ multixact.o \ parallel.o \ diff --git a/src/backend/access/transam/fdwxact.c b/src/backend/access/transam/fdwxact.c new file mode 100644 index 0000000000..03f351924b --- /dev/null +++ b/src/backend/access/transam/fdwxact.c @@ -0,0 +1,237 @@ +/*------------------------------------------------------------------------- + * + * fdwxact.c + * PostgreSQL global transaction manager for foreign servers. + * + * This module contains the code for managing transactions started on foreign + * servers. + * + * An FDW that implements both commit and rollback APIs can request to register + * the foreign transaction by FdwXactRegisterXact() to participate it to a + * group of distributed tranasction. The registered foreign transactions are + * identified by OIDs of server and user. On commit and rollback, the global + * transaction manager calls corresponding FDW API to end the tranasctions. + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/fdwxact.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdwxact.h" +#include "access/xlog.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" +#include "utils/memutils.h" + +/* Check the FdwXactParticipant is capable of two-phase commit */ +#define ServerSupportTransactionCallback(fdw_part) \ + (((FdwXactParticipant *)(fdw_part))->commit_foreign_xact_fn != NULL) + +/* + * Structure to bundle the foreign transaction participant. This struct + * needs to live until the end of transaction where we cannot look at + * syscaches. Therefore, this is allocated in the TopTransactionContext. + * + * Participants are identified by the pair of server OID and user OID, + * rather than user mapping OID. See README.fdwxact for the discussion. + */ +typedef struct FdwXactParticipant +{ + /* Foreign server and user mapping info, passed to callback routines */ + ForeignServer *server; + UserMapping *usermapping; + + /* Callbacks for foreign transaction */ + CommitForeignTransaction_function commit_foreign_xact_fn; + RollbackForeignTransaction_function rollback_foreign_xact_fn; +} FdwXactParticipant; + +/* + * List of foreign transactions involved in the transaction. A member of + * participants must support both commit and rollback APIs. + */ +static List *FdwXactParticipants = NIL; + +static void ForgetAllFdwXactParticipants(void); +static void FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, + bool commit); +static FdwXactParticipant *create_fdwxact_participant(Oid serverid, Oid userid, + FdwRoutine *routine); + +/* + * Register the given foreign transaction identified by the given arguments + * as a participant of the transaction. + */ +void +FdwXactRegisterXact(Oid serverid, Oid userid) +{ + FdwXactParticipant *fdw_part; + MemoryContext old_ctx; + FdwRoutine *routine; + ListCell *lc; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (fdw_part->server->serverid == serverid && + fdw_part->usermapping->userid == userid) + { + /* Already registered */ + return; + } + } + + routine = GetFdwRoutineByServerId(serverid); + + /* + * Foreign server managed by the transaction manager must implement + * transaction callbacks. + */ + if (!routine->CommitForeignTransaction) + ereport(ERROR, + (errmsg("cannot register foreign server not supporting transaction callback"))); + + /* + * Participant's information is also used at the end of a transaction, + * where system cache are not available. Save it in TopTransactionContext + * so that these can live until the end of transaction. + */ + old_ctx = MemoryContextSwitchTo(TopTransactionContext); + + fdw_part = create_fdwxact_participant(serverid, userid, routine); + + /* Add to the participants list */ + FdwXactParticipants = lappend(FdwXactParticipants, fdw_part); + + /* Revert back the context */ + MemoryContextSwitchTo(old_ctx); +} + +/* Remove the given foreign server from FdwXactParticipants */ +void +FdwXactUnregisterXact(Oid serverid, Oid userid) +{ + ListCell *lc; + + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + if (fdw_part->server->serverid == serverid && + fdw_part->usermapping->userid == userid) + { + /* Remove the entry */ + FdwXactParticipants = + foreach_delete_current(FdwXactParticipants, lc); + break; + } + } +} + +/* Return palloc'd FdwXactParticipant variable */ +static FdwXactParticipant * +create_fdwxact_participant(Oid serverid, Oid userid, FdwRoutine *routine) +{ + FdwXactParticipant *fdw_part; + ForeignServer *foreign_server; + UserMapping *user_mapping; + + foreign_server = GetForeignServer(serverid); + user_mapping = GetUserMapping(userid, serverid); + + fdw_part = (FdwXactParticipant *) palloc(sizeof(FdwXactParticipant)); + + fdw_part->server = foreign_server; + fdw_part->usermapping = user_mapping; + fdw_part->commit_foreign_xact_fn = routine->CommitForeignTransaction; + fdw_part->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; + + return fdw_part; +} + +/* + * The routine for committing or rolling back the given transaction participant. + */ +static void +FdwXactParticipantEndTransaction(FdwXactParticipant *fdw_part, bool commit) +{ + FdwXactInfo finfo; + + Assert(ServerSupportTransactionCallback(fdw_part)); + + finfo.server = fdw_part->server; + finfo.usermapping = fdw_part->usermapping; + finfo.flags = FDWXACT_FLAG_ONEPHASE; + + if (commit) + { + fdw_part->commit_foreign_xact_fn(&finfo); + elog(DEBUG1, "successfully committed the foreign transaction for server %u user %u", + fdw_part->usermapping->serverid, + fdw_part->usermapping->userid); + } + else + { + fdw_part->rollback_foreign_xact_fn(&finfo); + elog(DEBUG1, "successfully rolled back the foreign transaction for server %u user %u", + fdw_part->usermapping->serverid, + fdw_part->usermapping->userid); + } +} + +/* + * Clear the FdwXactParticipants list. + */ +static void +ForgetAllFdwXactParticipants(void) +{ + if (FdwXactParticipants == NIL) + return; + + list_free_deep(FdwXactParticipants); + FdwXactParticipants = NIL; +} + +/* + * Commit or rollback all foreign transactions. + */ +void +AtEOXact_FdwXact(bool is_commit) +{ + ListCell *lc; + + /* If there are no foreign servers involved, we have no business here */ + if (FdwXactParticipants == NIL) + return; + + Assert(!RecoveryInProgress()); + + /* Commit or rollback foreign transactions in the participant list */ + foreach(lc, FdwXactParticipants) + { + FdwXactParticipant *fdw_part = (FdwXactParticipant *) lfirst(lc); + + Assert(ServerSupportTransactionCallback(fdw_part)); + FdwXactParticipantEndTransaction(fdw_part, is_commit); + } + + ForgetAllFdwXactParticipants(); +} + +/* + * This function is called at PREPARE TRANSACTION. Since we don't support + * preparing foreign transactions yet, raise an error if the local transaction + * has any foreign transaction. + */ +void +AtPrepare_FdwXact(void) +{ + if (FdwXactParticipants != NIL) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has operated on foreign tables"))); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a2068e3fd4..497abcb491 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -2125,6 +2126,9 @@ CommitTransaction(void) CallXactCallbacks(is_parallel_worker ? XACT_EVENT_PARALLEL_PRE_COMMIT : XACT_EVENT_PRE_COMMIT); + /* Commit foreign transactions if any */ + AtEOXact_FdwXact(true); + /* If we might have parallel workers, clean them up now. */ if (IsInParallelMode()) AtEOXact_Parallel(true); @@ -2369,6 +2373,9 @@ PrepareTransaction(void) * the transaction-abort path. */ + /* Process foreign trasactions */ + AtPrepare_FdwXact(); + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2737,6 +2744,13 @@ AbortTransaction(void) TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); + /* + * Abort foreign transactions if any. This needs to be done before marking + * this transaction as not running since FDW's transaction callbacks might + * assume this transaction is still in progress. + */ + AtEOXact_FdwXact(false); + /* * Let others know about no transaction in progress by me. Note that this * must be done _before_ releasing locks we hold and _after_ diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 5564dc3a1e..f8eb4fa215 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -328,6 +328,10 @@ GetFdwRoutine(Oid fdwhandler) elog(ERROR, "foreign-data wrapper handler function %u did not return an FdwRoutine struct", fdwhandler); + /* The FDW must support both or nothing */ + Assert((routine->CommitForeignTransaction && routine->RollbackForeignTransaction) || + (!routine->CommitForeignTransaction && !routine->RollbackForeignTransaction)); + return routine; } diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h new file mode 100644 index 0000000000..05b36ebf2b --- /dev/null +++ b/src/include/access/fdwxact.h @@ -0,0 +1,33 @@ +/* + * fdwxact.h + * + * PostgreSQL global transaction manager + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * + * src/include/access/fdwxact.h + */ +#ifndef FDWXACT_H +#define FDWXACT_H + +#include "foreign/foreign.h" + +/* Flag passed to FDW transaction management APIs */ +#define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback + * without preparation */ + +/* State data for foreign transaction resolution, passed to FDW callbacks */ +typedef struct FdwXactInfo +{ + /* Foreign transaction information */ + ForeignServer *server; + UserMapping *usermapping; + + int flags; /* OR of FDWXACT_FLAG_xx flags */ +} FdwXactInfo; + +/* Function declarations */ +extern void AtEOXact_FdwXact(bool is_commit); +extern void AtPrepare_FdwXact(void); + +#endif /* FDWXACT_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 248f78da45..79f62ac354 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -13,6 +13,7 @@ #define FDWAPI_H #include "access/parallel.h" +#include "access/fdwxact.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" @@ -178,6 +179,9 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef void (*CommitForeignTransaction_function) (FdwXactInfo *finfo); +typedef void (*RollbackForeignTransaction_function) (FdwXactInfo *finfo); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -256,6 +260,10 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for transaction management */ + CommitForeignTransaction_function CommitForeignTransaction; + RollbackForeignTransaction_function RollbackForeignTransaction; } FdwRoutine; @@ -269,4 +277,8 @@ extern bool IsImportableForeignTable(const char *tablename, ImportForeignSchemaStmt *stmt); extern Path *GetExistingLocalJoinPath(RelOptInfo *joinrel); +/* Functions in fdwxact/fdwxact.c */ +extern void FdwXactRegisterXact(Oid serverid, Oid userid); +extern void FdwXactUnregisterXact(Oid serverid, Oid userid); + #endif /* FDWAPI_H */ -- 2.27.0