From e1b1bbde5874b21f4cae4cb4be90a7f2001b5033 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 28 Aug 2020 22:25:38 +0900 Subject: [PATCH v37 1/9] Introduce transaction manager for foreign transactions. The global transaciton manager manages the transactions initiated on the foreign server. This commit also adds both CommitForeignTransaction and RollbackForeignTransaction FDW APIs supporing only one-phase commit. FDW that implements these APIs can be managed by the global transaciton manager. So FDW is able to control its transaction using the foreign transaction manager, not using XactCallback. Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/fdwxact.c | 223 +++++++++++++++++++++++++++ src/backend/access/transam/xact.c | 8 + src/backend/foreign/foreign.c | 4 + src/include/access/fdwxact.h | 34 ++++ src/include/foreign/fdwapi.h | 13 ++ 6 files changed, 283 insertions(+) create mode 100644 src/backend/access/transam/fdwxact.c create mode 100644 src/include/access/fdwxact.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..b05a88549d 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -15,6 +15,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ clog.o \ commit_ts.o \ + fdwxact.o \ generic_xlog.o \ multixact.o \ parallel.o \ diff --git a/src/backend/access/transam/fdwxact.c b/src/backend/access/transam/fdwxact.c new file mode 100644 index 0000000000..ae3fdbdf83 --- /dev/null +++ b/src/backend/access/transam/fdwxact.c @@ -0,0 +1,223 @@ +/*------------------------------------------------------------------------- + * + * fdwxact.c + * PostgreSQL global transaction manager for foreign servers. + * + * This module contains the code for managing transactions started on foreign + * servers. + * + * An FDW that implements both commit and rollback APIs can request to register + * the foreign transaction participant by FdwXactRegisterEntry() to participate + * it to a group of distributed tranasction. The registered foreign transactions + * are identified by user mapping OID. On commit and rollback, the global + * transaction manager calls corresponding FDW API to end the foreign + * tranasctions. + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/access/transam/fdwxact.c + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/fdwxact.h" +#include "access/xact.h" +#include "access/xlog.h" +#include "catalog/pg_user_mapping.h" +#include "foreign/fdwapi.h" +#include "foreign/foreign.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +/* Initial size of the hash table */ +#define FDWXACT_HASH_SIZE 64 + +/* Check the FdwXactEntry supports commit (and rollback) callbacks */ +#define ServerSupportTransactionCallback(fdwent) \ + (((FdwXactEntry *)(fdwent))->commit_foreign_xact_fn != NULL) + +/* + * Structure to bundle the foreign transaction participant. + * + * Participants are identified by user mapping OID, rather than pair of + * user OID and server OID. See README.fdwxact for the discussion. + */ +typedef struct FdwXactEntry +{ + /* user mapping OID, hash key (must be first) */ + Oid umid; + + ForeignServer *server; + UserMapping *usermapping; + + /* Callbacks for foreign transaction */ + CommitForeignTransaction_function commit_foreign_xact_fn; + RollbackForeignTransaction_function rollback_foreign_xact_fn; +} FdwXactEntry; + +/* + * Foreign transaction participants involved in the current transaction. + * A member of participants must support both commit and rollback APIs + * (i.g., ServerSupportTransactionCallback() is true). + */ +static HTAB *FdwXactParticipants = NULL; + +/* Check the current transaction has at least one fdwxact participant */ +#define HasFdwXactParticipant() \ + (FdwXactParticipants != NULL && \ + hash_get_num_entries(FdwXactParticipants) > 0) + +static void EndFdwXactEntry(FdwXactEntry *fdwent, bool isCommit, + bool is_parallel_worker); +static void RemoveFdwXactEntry(Oid umid); + +/* + * Register the given foreign transaction participant identified by the + * given user mapping OID as a participant of the transaction. + */ +void +FdwXactRegisterEntry(UserMapping *usermapping) +{ + FdwXactEntry *fdwent; + FdwRoutine *routine; + Oid umid; + MemoryContext old_ctx; + bool found; + + Assert(IsTransactionState()); + + if (FdwXactParticipants == NULL) + { + HASHCTL ctl; + + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(FdwXactEntry); + + FdwXactParticipants = hash_create("fdw xact participants", + FDWXACT_HASH_SIZE, + &ctl, HASH_ELEM | HASH_BLOBS); + } + + umid = usermapping->umid; + fdwent = hash_search(FdwXactParticipants, (void *) &umid, HASH_ENTER, &found); + + if (found) + return; + + /* + * The participant information needs to live until the end of the + * transaction where syscache is not available, so we save them in + * TopTransactionContext. + */ + old_ctx = MemoryContextSwitchTo(TopTransactionContext); + + fdwent->usermapping = GetUserMapping(usermapping->userid, usermapping->serverid); + fdwent->server = GetForeignServer(usermapping->serverid); + + /* + * Foreign server managed by the transaction manager must implement + * transaction callbacks. + */ + routine = GetFdwRoutineByServerId(usermapping->serverid); + if (!routine->CommitForeignTransaction) + ereport(ERROR, + (errmsg("cannot register foreign server not supporting transaction callback"))); + + fdwent->commit_foreign_xact_fn = routine->CommitForeignTransaction; + fdwent->rollback_foreign_xact_fn = routine->RollbackForeignTransaction; + + MemoryContextSwitchTo(old_ctx); +} + +/* Remove the foreign transaction from FdwXactParticipants */ +void +FdwXactUnregisterEntry(UserMapping *usermapping) +{ + Assert(IsTransactionState()); + RemoveFdwXactEntry(usermapping->umid); +} + +/* + * Remove an FdwXactEntry identified by the given user mapping id from the + * hash table. + */ +static void +RemoveFdwXactEntry(Oid umid) +{ + (void) hash_search(FdwXactParticipants, (void *) &umid, HASH_REMOVE, NULL); +} + +/* + * Commit or rollback all foreign transactions. + */ +void +AtEOXact_FdwXact(bool isCommit, bool is_parallel_worker) +{ + FdwXactEntry *fdwent; + HASH_SEQ_STATUS scan; + + /* If there are no foreign servers involved, we have no business here */ + if (!HasFdwXactParticipant()) + return; + + hash_seq_init(&scan, FdwXactParticipants); + while ((fdwent = (FdwXactEntry *) hash_seq_search(&scan))) + { + Assert(ServerSupportTransactionCallback(fdwent)); + + /* Commit or rollback foreign transaction */ + EndFdwXactEntry(fdwent, isCommit, is_parallel_worker); + + /* + * Remove the entry so that we don't recursively process this foreign + * transaction. + */ + RemoveFdwXactEntry(fdwent->umid); + } + + Assert(!HasFdwXactParticipant()); +} + +/* + * The routine for committing or rolling back the given transaction participant. + */ +static void +EndFdwXactEntry(FdwXactEntry *fdwent, bool isCommit, bool is_parallel_worker) +{ + FdwXactInfo finfo; + + Assert(ServerSupportTransactionCallback(fdwent)); + + finfo.server = fdwent->server; + finfo.usermapping = fdwent->usermapping; + finfo.flags = FDWXACT_FLAG_ONEPHASE | + ((is_parallel_worker) ? FDWXACT_FLAG_PARALLEL_WORKER : 0); + + if (isCommit) + { + fdwent->commit_foreign_xact_fn(&finfo); + elog(DEBUG1, "successfully committed the foreign transaction for user mapping %u", + fdwent->umid); + } + else + { + fdwent->rollback_foreign_xact_fn(&finfo); + elog(DEBUG1, "successfully rolled back the foreign transaction for user mapping %u", + fdwent->umid); + } +} + +/* + * This function is called at PREPARE TRANSACTION. Since we don't support + * preparing foreign transactions for now, raise an error if the local transaction + * has any foreign transaction. + */ +void +AtPrepare_FdwXact(void) +{ + if (HasFdwXactParticipant()) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has operated on foreign tables"))); +} diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 441445927e..1e00a3a98e 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -21,6 +21,7 @@ #include #include "access/commit_ts.h" +#include "access/fdwxact.h" #include "access/multixact.h" #include "access/parallel.h" #include "access/subtrans.h" @@ -2129,6 +2130,9 @@ CommitTransaction(void) if (IsInParallelMode()) AtEOXact_Parallel(true); + /* Call foreign transaction callbacks at pre-commit phase, if any */ + AtEOXact_FdwXact(true, is_parallel_worker); + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2369,6 +2373,9 @@ PrepareTransaction(void) * the transaction-abort path. */ + /* Process foreign trasactions */ + AtPrepare_FdwXact(); + /* Shut down the deferred-trigger manager */ AfterTriggerEndXact(true); @@ -2705,6 +2712,7 @@ AbortTransaction(void) AtAbort_Notify(); AtEOXact_RelationMap(false, is_parallel_worker); AtAbort_Twophase(); + AtEOXact_FdwXact(false, is_parallel_worker); /* * Advertise the fact that we aborted in pg_xact (assuming that we got as diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c index 5564dc3a1e..f8eb4fa215 100644 --- a/src/backend/foreign/foreign.c +++ b/src/backend/foreign/foreign.c @@ -328,6 +328,10 @@ GetFdwRoutine(Oid fdwhandler) elog(ERROR, "foreign-data wrapper handler function %u did not return an FdwRoutine struct", fdwhandler); + /* The FDW must support both or nothing */ + Assert((routine->CommitForeignTransaction && routine->RollbackForeignTransaction) || + (!routine->CommitForeignTransaction && !routine->RollbackForeignTransaction)); + return routine; } diff --git a/src/include/access/fdwxact.h b/src/include/access/fdwxact.h new file mode 100644 index 0000000000..1d4a285c75 --- /dev/null +++ b/src/include/access/fdwxact.h @@ -0,0 +1,34 @@ +/* + * fdwxact.h + * + * PostgreSQL global transaction manager + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * + * src/include/access/fdwxact.h + */ +#ifndef FDWXACT_H +#define FDWXACT_H + +#include "access/xact.h" +#include "foreign/foreign.h" + +/* Flag passed to FDW transaction management APIs */ +#define FDWXACT_FLAG_ONEPHASE 0x01 /* transaction can commit/rollback + * without preparation */ +#define FDWXACT_FLAG_PARALLEL_WORKER 0x02 /* is parallel worker? */ + +/* State data for foreign transaction resolution, passed to FDW callbacks */ +typedef struct FdwXactInfo +{ + ForeignServer *server; + UserMapping *usermapping; + + int flags; /* OR of FDWXACT_FLAG_xx flags */ +} FdwXactInfo; + +/* Function declarations */ +extern void AtEOXact_FdwXact(bool isCommit, bool is_parallel_worker); +extern void AtPrepare_FdwXact(void); + +#endif /* FDWXACT_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index a801cd3057..c3539a4d73 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -13,6 +13,7 @@ #define FDWAPI_H #include "access/parallel.h" +#include "access/fdwxact.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" @@ -191,6 +192,10 @@ typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq); typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq); +typedef void (*CommitForeignTransaction_function) (FdwXactInfo *finfo); +typedef void (*RollbackForeignTransaction_function) (FdwXactInfo *finfo); + + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -278,6 +283,10 @@ typedef struct FdwRoutine ForeignAsyncRequest_function ForeignAsyncRequest; ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; ForeignAsyncNotify_function ForeignAsyncNotify; + + /* Support functions for transaction management */ + CommitForeignTransaction_function CommitForeignTransaction; + RollbackForeignTransaction_function RollbackForeignTransaction; } FdwRoutine; @@ -291,4 +300,8 @@ extern bool IsImportableForeignTable(const char *tablename, ImportForeignSchemaStmt *stmt); extern Path *GetExistingLocalJoinPath(RelOptInfo *joinrel); +/* Functions in transam/fdwxact.c */ +extern void FdwXactRegisterEntry(UserMapping *usermapping); +extern void FdwXactUnregisterEntry(UserMapping *usermapping); + #endif /* FDWAPI_H */ -- 2.24.3 (Apple Git-128)