From f7be8aed5dbd96d9e9b14967fabe9d275732b5ed Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Mon, 2 Nov 2020 14:32:10 +0900 Subject: [PATCH v34 09/11] postgres_fdw marks foreign transaction as modified on modification. This commit enables postgres_fdw to execute two-phase commit protocol on transaction commit (without explicitly executing PREPARE TRANSACTION). Co-authored-by: Masahiko Sawada, Ashutosh Bapat --- contrib/postgres_fdw/connection.c | 19 ++++++++++++++++++- contrib/postgres_fdw/postgres_fdw.c | 2 ++ contrib/postgres_fdw/postgres_fdw.h | 1 + 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index b7b9e789d0..967a2fca53 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -61,6 +61,7 @@ typedef struct ConnCacheEntry bool changing_xact_state; /* xact state change in process */ bool invalidated; /* true if reconnect is pending */ Oid serverid; /* foreign server OID used to get server name */ + bool modified; /* true if data on the foreign server is modified */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ } ConnCacheEntry; @@ -297,6 +298,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->changing_xact_state = false; entry->invalidated = false; entry->serverid = server->serverid; + entry->modified = false; entry->server_hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(server->serverid)); @@ -311,6 +313,20 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->conn, server->servername, user->umid, user->userid); } +void +MarkConnectionModified(UserMapping *user) +{ + ConnCacheEntry *entry; + + entry = GetConnectionCacheEntry(user->umid); + + if (entry && !entry->modified) + { + FdwXactRegisterXact(user->serverid, user->userid, true); + entry->modified = true; + } +} + /* * Connect to remote server using specified server and user mapping properties. */ @@ -582,7 +598,7 @@ begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) entry->conn); /* Register the foreign server to the transaction */ - FdwXactRegisterXact(user->serverid, user->userid); + FdwXactRegisterXact(user->serverid, user->userid, false); if (IsolationIsSerializable()) sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; @@ -591,6 +607,7 @@ begin_remote_xact(ConnCacheEntry *entry, UserMapping *user) entry->changing_xact_state = true; do_sql_command(entry->conn, sql); entry->xact_depth = 1; + entry->modified = false; entry->changing_xact_state = false; } diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index dd55bde3bb..27263bbf8e 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -2487,6 +2487,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * establish new connection if necessary. */ dmstate->conn = GetConnection(user, false); + MarkConnectionModified(user); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -3680,6 +3681,7 @@ create_foreign_modify(EState *estate, /* Open connection; report that we'll create a prepared statement. */ fmstate->conn = GetConnection(user, true); + MarkConnectionModified(user); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 8c72c910c7..fc5a0766f4 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -132,6 +132,7 @@ extern void reset_transmission_modes(int nestlevel); /* in connection.c */ extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); extern void ReleaseConnection(PGconn *conn); +extern void MarkConnectionModified(UserMapping *user); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); -- 2.27.0