From 2b210cdb93e352c0599e8b257c0683fd58b5ddf7 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 26 Dec 2017 19:52:05 +0900 Subject: [PATCH 3/3] postgres_fdw supports atomic distributed transaction commit. --- contrib/postgres_fdw/connection.c | 566 +++++++++++++++--------- contrib/postgres_fdw/expected/postgres_fdw.out | 343 ++++++++++++++- contrib/postgres_fdw/option.c | 5 +- contrib/postgres_fdw/postgres_fdw.c | 93 ++++- contrib/postgres_fdw/postgres_fdw.h | 14 +- contrib/postgres_fdw/sql/postgres_fdw.sql | 133 ++++++ doc/src/sgml/postgres-fdw.sgml | 37 ++ 7 files changed, 952 insertions(+), 239 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 00c926b..775e0c0 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -14,9 +14,11 @@ #include "postgres_fdw.h" +#include "access/fdwxact.h" #include "access/htup_details.h" #include "catalog/pg_user_mapping.h" #include "access/xact.h" +#include "commands/defrem.h" #include "mb/pg_wchar.h" #include "miscadmin.h" #include "pgstat.h" @@ -73,13 +75,13 @@ static unsigned int prep_stmt_number = 0; static bool xact_got_connection = false; /* prototypes of private functions */ -static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); +static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user, + bool connection_error_ok); static void disconnect_pg_server(ConnCacheEntry *entry); static void check_conn_params(const char **keywords, const char **values, UserMapping *user); static void configure_remote_session(PGconn *conn); static void do_sql_command(PGconn *conn, const char *sql); -static void begin_remote_xact(ConnCacheEntry *entry); -static void pgfdw_xact_callback(XactEvent event, void *arg); +static void begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid); static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, @@ -91,24 +93,27 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result); +static void pgfdw_cleanup_after_transaction(ConnCacheEntry *entry, bool is_commit); +static ConnCacheEntry *GetConnectionCacheEntry(Oid umid); - -/* - * Get a PGconn which can be used to execute queries on the remote PostgreSQL - * server with the user's authorization. A new connection is established - * if we don't already have a suitable one, and a transaction is opened at - * the right subtransaction nesting depth if we didn't do that already. - * - * will_prep_stmt must be true if caller intends to create any prepared - * statements. Since those don't go away automatically at transaction end - * (not even on error), we need this flag to cue manual cleanup. - */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetExistingConnection(Oid umid) { - bool found; - ConnCacheEntry *entry; - ConnCacheKey key; + ConnCacheEntry *entry; + + entry = GetConnectionCacheEntry(umid); + + Assert(entry->conn != NULL); + + return entry->conn; +} + +static ConnCacheEntry * +GetConnectionCacheEntry(Oid umid) +{ + ConnCacheEntry *entry; + ConnCacheKey key; + bool found; /* First time through, initialize connection cache hashtable */ if (ConnectionHash == NULL) @@ -128,7 +133,6 @@ GetConnection(UserMapping *user, bool will_prep_stmt) * Register some callback functions that manage connection cleanup. * This should be done just once in each backend. */ - RegisterXactCallback(pgfdw_xact_callback, NULL); RegisterSubXactCallback(pgfdw_subxact_callback, NULL); CacheRegisterSyscacheCallback(FOREIGNSERVEROID, pgfdw_inval_callback, (Datum) 0); @@ -136,11 +140,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt) pgfdw_inval_callback, (Datum) 0); } - /* Set flag that we did GetConnection during the current transaction */ - xact_got_connection = true; - /* Create hash key for the entry. Assume no pad bytes in key struct */ - key = user->umid; + key = umid; /* * Find or create cached entry for requested connection. @@ -155,6 +156,28 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->conn = NULL; } + return entry; +} + +/* + * Get a PGconn which can be used to execute queries on the remote PostgreSQL + * server with the user's authorization. A new connection is established + * if we don't already have a suitable one, and a transaction is opened at + * the right subtransaction nesting depth if we didn't do that already. + * + * will_prep_stmt must be true if caller intends to create any prepared + * statements. Since those don't go away automatically at transaction end + * (not even on error), we need this flag to cue manual cleanup. + */ +PGconn * +GetConnection(UserMapping *user, bool will_prep_stmt, + bool start_transaction, bool connection_error_ok) +{ + ConnCacheEntry *entry; + + /* Get connection cache entry from cache */ + entry = GetConnectionCacheEntry(user->umid); + /* Reject further use of connections which failed abort cleanup. */ pgfdw_reject_incomplete_xact_state_change(entry); @@ -198,7 +221,16 @@ GetConnection(UserMapping *user, bool will_prep_stmt) ObjectIdGetDatum(user->umid)); /* Now try to make the connection */ - entry->conn = connect_pg_server(server, user); + entry->conn = connect_pg_server(server, user, connection_error_ok); + + Assert(entry->conn || connection_error_ok); + + if (!entry->conn && connection_error_ok) + { + elog(DEBUG3, "attempt to connection to server \"%s\" by postgres_fdw failed", + server->servername); + return NULL; + } elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)", entry->conn, server->servername, user->umid, user->userid); @@ -207,7 +239,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Start a new transaction or subtransaction if needed. */ - begin_remote_xact(entry); + if (start_transaction) + { + begin_remote_xact(entry, user->serverid, user->userid); + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + } /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; @@ -217,9 +254,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* * Connect to remote server using specified server and user mapping properties. + * If the attempt to connect fails, and the caller can handle connection failure + * (connection_error_ok = true) return NULL, throw error otherwise. */ static PGconn * -connect_pg_server(ForeignServer *server, UserMapping *user) +connect_pg_server(ForeignServer *server, UserMapping *user, + bool connection_error_ok) { PGconn *volatile conn = NULL; @@ -265,11 +305,25 @@ connect_pg_server(ForeignServer *server, UserMapping *user) conn = PQconnectdbParams(keywords, values, false); if (!conn || PQstatus(conn) != CONNECTION_OK) - ereport(ERROR, - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), - errmsg("could not connect to server \"%s\"", - server->servername), - errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + { + char *connmessage; + int msglen; + + /* libpq typically appends a newline, strip that */ + connmessage = pstrdup(PQerrorMessage(conn)); + msglen = strlen(connmessage); + if (msglen > 0 && connmessage[msglen - 1] == '\n') + connmessage[msglen - 1] = '\0'; + + if (connection_error_ok) + return NULL; + else + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not connect to server \"%s\"", + server->servername), + errdetail_internal("%s", pchomp(PQerrorMessage(conn))))); + } /* * Check that non-superuser has used password to establish connection; @@ -414,15 +468,24 @@ do_sql_command(PGconn *conn, const char *sql) * control which remote queries share a snapshot. */ static void -begin_remote_xact(ConnCacheEntry *entry) +begin_remote_xact(ConnCacheEntry *entry, Oid serverid, Oid userid) { int curlevel = GetCurrentTransactionNestLevel(); + ForeignServer *server = GetForeignServer(serverid); /* Start main transaction if we haven't yet */ if (entry->xact_depth <= 0) { const char *sql; + /* + * Register the new foreign server and check whether the two phase + * compliance is possible. + */ + FdwXactRegisterForeignServer(serverid, userid, + server_uses_two_phase_commit(server), + false); + elog(DEBUG3, "starting remote transaction on connection %p", entry->conn); @@ -644,193 +707,6 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, } /* - * pgfdw_xact_callback --- cleanup at main-transaction end. - */ -static void -pgfdw_xact_callback(XactEvent event, void *arg) -{ - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - - /* Quick exit if no connections were touched in this transaction. */ - if (!xact_got_connection) - return; - - /* - * Scan all connection cache entries to find open remote transactions, and - * close them. - */ - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) - { - PGresult *res; - - /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) - continue; - - /* If it has an open remote transaction, try to close it */ - if (entry->xact_depth > 0) - { - bool abort_cleanup_failure = false; - - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); - - switch (event) - { - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - - /* - * If abort cleanup previously failed for this connection, - * we can't issue any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); - - /* Commit all remote transactions during pre-commit */ - entry->changing_xact_state = true; - do_sql_command(entry->conn, "COMMIT TRANSACTION"); - entry->changing_xact_state = false; - - /* - * If there were any errors in subtransactions, and we - * made prepared statements, do a DEALLOCATE ALL to make - * sure we get rid of all prepared statements. This is - * annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this - * constrains how old a server postgres_fdw can - * communicate with. We intentionally ignore errors in - * the DEALLOCATE, so that we can hobble along to some - * extent with older servers (leaking prepared statements - * as we go; but we don't really support update operations - * pre-8.3 anyway). - */ - if (entry->have_prep_stmt && entry->have_error) - { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); - } - entry->have_prep_stmt = false; - entry->have_error = false; - break; - case XACT_EVENT_PRE_PREPARE: - - /* - * We disallow remote transactions that modified anything, - * since it's not very reasonable to hold them open until - * the prepared transaction is committed. For the moment, - * throw error unconditionally; later we might allow - * read-only cases. Note that the error will cause us to - * come right back here with event == XACT_EVENT_ABORT, so - * we'll clean up the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot prepare a transaction that modified remote tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - - /* - * Don't try to clean up the connection if we're already - * in error recursion trouble. - */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; - - /* - * If connection is already unsalvageable, don't touch it - * further. - */ - if (entry->changing_xact_state) - break; - - /* - * Mark this connection as in the process of changing - * transaction state. - */ - entry->changing_xact_state = true; - - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; - - /* - * If a command has been submitted to the remote server by - * using an asynchronous execution function, the command - * might not have yet completed. Check to see if a - * command is still being processed by the remote server, - * and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - { - /* Unable to cancel running query. */ - abort_cleanup_failure = true; - } - else if (!pgfdw_exec_cleanup_query(entry->conn, - "ABORT TRANSACTION", - false)) - { - /* Unable to abort remote transaction. */ - abort_cleanup_failure = true; - } - else if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - { - /* Trouble clearing prepared statements. */ - abort_cleanup_failure = true; - } - else - { - entry->have_prep_stmt = false; - entry->have_error = false; - } - - /* Disarm changing_xact_state if it all worked. */ - entry->changing_xact_state = abort_cleanup_failure; - break; - } - } - - /* Reset state to show we're out of a transaction */ - entry->xact_depth = 0; - - /* - * If the connection isn't in a good idle state, discard it to - * recover. Next GetConnection will open a new connection. - */ - if (PQstatus(entry->conn) != CONNECTION_OK || - PQtransactionStatus(entry->conn) != PQTRANS_IDLE || - entry->changing_xact_state) - { - elog(DEBUG3, "discarding connection %p", entry->conn); - disconnect_pg_server(entry); - } - } - - /* - * Regardless of the event type, we can now mark ourselves as out of the - * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, - * this saves a useless scan of the hashtable during COMMIT or PREPARE.) - */ - xact_got_connection = false; - - /* Also reset cursor numbering for next transaction */ - cursor_number = 0; -} - -/* * pgfdw_subxact_callback --- cleanup at subtransaction end. */ static void @@ -1193,3 +1069,255 @@ exit: ; *result = last_res; return timed_out; } + +/* + * The function prepares transaction on foreign server. This function + * is called only at the pre-commit phase of the local transaction. Since + * we should have the connection to the server that we are interested in + * we don't use serverid and userid that are necessary to get user mapping + * that is the key of the connection cache. + */ +bool +postgresPrepareForeignTransaction(Oid serverid, Oid userid, Oid umid, + const char *prep_id) +{ + ConnCacheEntry *entry = NULL; + + entry = GetConnectionCacheEntry(umid); + + pgfdw_reject_incomplete_xact_state_change(entry); + + if (entry->conn) + { + StringInfo command; + bool result; + + pgfdw_reject_incomplete_xact_state_change(entry); + + command = makeStringInfo(); + appendStringInfo(command, "PREPARE TRANSACTION '%s'", prep_id); + + entry->changing_xact_state = true; + result = pgfdw_exec_cleanup_query(entry->conn, command->data, false); + entry->changing_xact_state = false; + + elog(DEBUG1, "prepare foreign transaction on server %u with ID %s", + serverid, prep_id); + + pgfdw_cleanup_after_transaction(entry, true); + return result; + } + + return false; +} + +/* + * The function commits or abort the transactionon foreign server. This + * function is called both at the pre-commit phase of the local transaction + * when committing and at the end of the local transaction when aborting. + * Since we should the connections to the server that involved with the local + * transaction we don't use serverid and userid that are necessary to get + * user mapping that is the key of connection cache. + */ +bool +postgresEndForeignTransaction(Oid serverid, Oid userid, Oid umid, + bool is_commit) +{ + ConnCacheEntry *entry = NULL; + + entry = GetConnectionCacheEntry(umid); + + /* + * If abort cleanup previously failed for this connection, we can't issue + * any more commands against it. + */ + if (is_commit) + pgfdw_reject_incomplete_xact_state_change(entry); + + if (entry->conn) + { + StringInfo command; + bool result; + + command = makeStringInfo(); + appendStringInfo(command, "%s TRANSACTION", is_commit ? "COMMIT" : "ROLLBACK"); + entry->changing_xact_state = true; + result = pgfdw_exec_cleanup_query(entry->conn, command->data, false); + entry->changing_xact_state = false; + + pgfdw_cleanup_after_transaction(entry, true); + return result; + } + + return false; +} + +/* + * The function commits or aborts prepared transaction on foreign server. + * This function could be called both at end of the local transaction and + * in a new transaction, for example, by the resolver process. + */ +bool +postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid, Oid umid, + bool is_commit, const char *prep_id) +{ + ConnCacheEntry *entry; + PGconn *conn; + + /* + * If we are in a valid transaction state it means that we are trying to + * resolve a transaction in a new transaction just before started and that + * we might not have a connect to the server yet. So we use GetConnection + * which establishes the connection if don't have it yet. This can happen when + * the foreign transaction resolve process tries to resolve it. On the other + * hand, if we are not in a valid transaction state it means that we are trying + * to resolve a foreign transaction at end of the local transaction. Since we + * should have the connection to the server we just get a connection cache entry. + */ + if (IsTransactionState()) + conn = GetConnection(GetUserMapping(userid, serverid), false, false, false); + else + { + entry = GetConnectionCacheEntry(umid); + + /* Reject further use of connections which failed abort cleanup */ + if (is_commit) + pgfdw_reject_incomplete_xact_state_change(entry); + + conn = entry->conn; + } + + if (conn) + { + StringInfo command; + PGresult *res; + bool result; + + command = makeStringInfo(); + appendStringInfo(command, "%s PREPARED '%s'", + is_commit ? "COMMIT" : "ROLLBACK", + prep_id); + res = PQexec(conn, command->data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + int sqlstate; + char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE); + + /* + * The command failed, raise a warning to log the reason of failure. + * We may not be in a transaction here, so raising error doesn't + * help. Even if we are in a transaction, it would be the resolver + * transaction, which will get aborted on raising error, thus + * delaying resolution of other prepared foreign transactions. + */ + pgfdw_report_error(WARNING, res, conn, false, command->data); + + if (diag_sqlstate) + { + sqlstate = MAKE_SQLSTATE(diag_sqlstate[0], + diag_sqlstate[1], + diag_sqlstate[2], + diag_sqlstate[3], + diag_sqlstate[4]); + } + else + sqlstate = ERRCODE_CONNECTION_FAILURE; + + /* + * If we tried to COMMIT/ABORT a prepared transaction and the prepared + * transaction was missing on the foreign server, it was probably + * resolved by some other means. Anyway, it should be considered as resolved. + */ + result = (sqlstate == ERRCODE_UNDEFINED_OBJECT); + } + else + result = true; + + elog(DEBUG1, "%s prepared foreign transaction on server %u with ID %s", + is_commit ? "committed" : "aborted", serverid, prep_id); + + PQclear(res); + ReleaseConnection(conn); + return result; + } + + return false; +} + +/* Cleanup at main-transaction end */ +static void +pgfdw_cleanup_after_transaction(ConnCacheEntry *entry, bool is_commit) +{ + if (entry->xact_depth > 0) + { + if (is_commit) + { + /* + * If there were any errors in subtransactions, and we made prepared + * statements, do a DEALLOCATE ALL to make sure we get rid of all + * prepared statements. This is annoying and not terribly bulletproof, + * but it's probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how + * old a server postgres_fdw can communicate with. We intentionally + * ignore errors in the DEALLOCATE, so that we can hobble along to some + * extent with older servers (leaking prepared statements as we go; + * but we don't really support update operations pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + PGresult *res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + + entry->have_prep_stmt = false; + entry->have_error = false; + } + else + { + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; + + /* If connection is already unsalvageable, don't touch it further */ + + if (!entry->changing_xact_state) + { + entry->changing_xact_state = true; + + if (entry->have_prep_stmt && + !pgfdw_exec_cleanup_query(entry->conn, "DEALLOCATE ALL", true)) + { + entry->changing_xact_state = true; + } + } + } + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + + /* + * If the connection isn't in a good idle state, discard it to + * recover. Next GetConnection will open a new connection. + */ + if (PQstatus(entry->conn) != CONNECTION_OK || + PQtransactionStatus(entry->conn) != PQTRANS_IDLE) + { + elog(DEBUG3, "discarding connection %p", entry->conn); + PQfinish(entry->conn); + entry->conn = NULL; + } + } + + /* + * Regardless of the event type, we can now mark ourselves as out of the + * transaction. + */ + xact_got_connection = false; + + /* Also reset cursor numbering for next transaction */ + cursor_number = 0; +} diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index adbf77f..e3fc66c 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -13,12 +13,17 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3; -- =================================================================== -- create objects used through FDW loopback server -- =================================================================== @@ -52,6 +57,13 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5" ( + c1 int NOT NULL +); +CREATE TABLE "S 1"."T 6" ( + c1 int NOT NULL, + CONSTRAINT t6_pkey PRIMARY KEY (c1) +); INSERT INTO "S 1"."T 1" SELECT id, id % 10, @@ -82,6 +94,7 @@ ANALYZE "S 1"."T 1"; ANALYZE "S 1"."T 2"; ANALYZE "S 1"."T 3"; ANALYZE "S 1"."T 4"; +ANALYZE "S 1"."T 5"; -- =================================================================== -- create foreign tables -- =================================================================== @@ -124,6 +137,15 @@ CREATE FOREIGN TABLE ft6 ( c2 int NOT NULL, c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft7_not_twophase ( + c1 int NOT NULL +) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5'); +CREATE FOREIGN TABLE ft8_twophase ( + c1 int NOT NULL +) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5'); +CREATE FOREIGN TABLE ft9_twophase ( + c1 int NOT NULL +) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 5'); -- A table with oids. CREATE FOREIGN TABLE doesn't support the -- WITH OIDS option, but ALTER does. CREATE FOREIGN TABLE ft_pg_type ( @@ -180,16 +202,19 @@ ALTER FOREIGN TABLE ft2 OPTIONS (schema_name 'S 1', table_name 'T 1'); ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1'); \det+ - List of foreign tables - Schema | Table | Server | FDW options | Description ---------+------------+-----------+--------------------------------------------------+------------- - public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | - public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | - public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | - public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | - public | ft_pg_type | loopback | (schema_name 'pg_catalog', table_name 'pg_type') | -(6 rows) + List of foreign tables + Schema | Table | Server | FDW options | Description +--------+------------------+-----------+--------------------------------------------------+------------- + public | ft1 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') | + public | ft4 | loopback | (schema_name 'S 1', table_name 'T 3') | + public | ft5 | loopback | (schema_name 'S 1', table_name 'T 4') | + public | ft6 | loopback2 | (schema_name 'S 1', table_name 'T 4') | + public | ft7_not_twophase | loopback | (schema_name 'S 1', table_name 'T 5') | + public | ft8_twophase | loopback2 | (schema_name 'S 1', table_name 'T 5') | + public | ft9_twophase | loopback3 | (schema_name 'S 1', table_name 'T 5') | + public | ft_pg_type | loopback | (schema_name 'pg_catalog', table_name 'pg_type') | +(9 rows) -- Test that alteration of server options causes reconnection -- Remote's errors might be non-English, so hide them to ensure stable results @@ -7794,3 +7819,301 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t (4 rows) RESET enable_partition_wise_join; +-- =================================================================== +-- test Atomic commit across foreign servers +-- =================================================================== +ALTER SERVER loopback OPTIONS(ADD two_phase_commit 'off'); +ALTER SERVER loopback2 OPTIONS(ADD two_phase_commit 'on'); +ALTER SERVER loopback3 OPTIONS(ADD two_phase_commit 'on'); +-- Check two_phase_commit setting +SELECT srvname FROM pg_foreign_server WHERE 'two_phase_commit=on' = ANY(srvoptions) or 'two_phase_commit=off' = ANY(srvoptions); + srvname +----------- + loopback + loopback2 + loopback3 +(3 rows) + +-- modify one supported server and commit. +BEGIN; +INSERT INTO ft8_twophase VALUES(1); +COMMIT; +SELECT * FROM ft8_twophase; + c1 +---- + 1 +(1 row) + +-- modify one supported server and rollback. +BEGIN; +INSERT INTO ft8_twophase VALUES(1); +ROLLBACK; +SELECT * FROM ft8_twophase; + c1 +---- + 1 +(1 row) + +-- modify two supported server and commit. +BEGIN; +INSERT INTO ft8_twophase VALUES(2); +INSERT INTO ft9_twophase VALUES(2); +COMMIT; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 +(3 rows) + +SELECT * FROM ft9_twophase; + c1 +---- + 1 + 2 + 2 +(3 rows) + +-- modify two supported server and rollback. +BEGIN; +INSERT INTO ft8_twophase VALUES(3); +INSERT INTO ft9_twophase VALUES(3); +ROLLBACK; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 +(3 rows) + +SELECT * FROM ft9_twophase; + c1 +---- + 1 + 2 + 2 +(3 rows) + +-- modify local and one supported server and commit. +BEGIN; +INSERT INTO ft8_twophase VALUES(4); +INSERT INTO "S 1"."T 6" VALUES (4); +COMMIT; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 +(4 rows) + +SELECT * FROM "S 1"."T 6"; + c1 +---- + 4 +(1 row) + +-- modify local and one supported server and rollback. +BEGIN; +INSERT INTO ft8_twophase VALUES(5); +INSERT INTO "S 1"."T 6" VALUES (5); +ROLLBACK; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 +(4 rows) + +SELECT * FROM "S 1"."T 6"; + c1 +---- + 4 +(1 row) + +-- modify supported server and non-supported server and commit. +BEGIN; +INSERT INTO ft7_not_twophase VALUES(6); +INSERT INTO ft8_twophase VALUES(6); +COMMIT; +SELECT * FROM ft7_not_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 +(6 rows) + +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 +(6 rows) + +-- modify supported server and non-supported server and rollback. +BEGIN; +INSERT INTO ft7_not_twophase VALUES(7); +INSERT INTO ft8_twophase VALUES(7); +ROLLBACK; +SELECT * FROM ft7_not_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 +(6 rows) + +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 +(6 rows) + +-- modify foreign server and raise an error +BEGIN; +INSERT INTO ft8_twophase VALUES(8); +INSERT INTO ft9_twophase VALUES(NULL); -- violation +ERROR: null value in column "c1" violates not-null constraint +DETAIL: Failing row contains (null). +CONTEXT: Remote SQL command: INSERT INTO "S 1"."T 5"(c1) VALUES ($1) +COMMIT; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 +(6 rows) + +SELECT * FROM ft9_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 +(6 rows) + +-- commit and rollback foreign transactions that are part of +-- prepare transaction. +BEGIN; +INSERT INTO ft8_twophase VALUES(9); +INSERT INTO ft9_twophase VALUES(9); +PREPARE TRANSACTION 'gx1'; +COMMIT PREPARED 'gx1'; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 + 9 + 9 +(8 rows) + +SELECT * FROM ft9_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 + 9 + 9 +(8 rows) + +BEGIN; +INSERT INTO ft8_twophase VALUES(10); +INSERT INTO ft9_twophase VALUES(10); +PREPARE TRANSACTION 'gx1'; +ROLLBACK PREPARED 'gx1'; +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 + 9 + 9 +(8 rows) + +SELECT * FROM ft9_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 + 9 + 9 +(8 rows) + +-- fails, cannot prepare the transaction if non-supporeted +-- server involved in. +BEGIN; +INSERT INTO ft7_not_twophase VALUES(11); +INSERT INTO ft8_twophase VALUES(11); +PREPARE TRANSACTION 'gx1'; +ERROR: can not prepare the transaction because some foreign servers involved in transaction can not prepare the transaction +SELECT * FROM ft7_not_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 + 9 + 9 +(8 rows) + +SELECT * FROM ft8_twophase; + c1 +---- + 1 + 2 + 2 + 4 + 6 + 6 + 9 + 9 +(8 rows) + diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 082f79a..dadd519 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -108,7 +108,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS) * Validate option value, when we can do so without any context. */ if (strcmp(def->defname, "use_remote_estimate") == 0 || - strcmp(def->defname, "updatable") == 0) + strcmp(def->defname, "updatable") == 0 || + strcmp(def->defname, "two_phase_commit") == 0) { /* these accept only boolean values */ (void) defGetBoolean(def); @@ -177,6 +178,8 @@ InitPgFdwOptions(void) /* fetch_size is available on both server and table */ {"fetch_size", ForeignServerRelationId, false}, {"fetch_size", ForeignTableRelationId, false}, + /* two phase commit support */ + {"two_phase_commit", ForeignServerRelationId, false}, {NULL, InvalidOid, false} }; diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index c1d7f80..1f96e64 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -14,6 +14,8 @@ #include "postgres_fdw.h" +#include "access/fdwxact.h" +#include "access/xact.h" #include "access/htup_details.h" #include "access/sysattr.h" #include "catalog/pg_class.h" @@ -353,6 +355,7 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel, RelOptInfo *output_rel); +extern char*postgresGetPrepareId(Oid serveroid, Oid userid, int *prep_info_len); /* * Helper functions @@ -434,7 +437,6 @@ static void merge_fdw_options(PgFdwRelationInfo *fpinfo, const PgFdwRelationInfo *fpinfo_o, const PgFdwRelationInfo *fpinfo_i); - /* * Foreign-data wrapper handler function: return a struct with pointers * to my callback routines. @@ -483,6 +485,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + /* Support functions for foreign transactions */ + routine->GetPrepareId = postgresGetPrepareId; + routine->PrepareForeignTransaction = postgresPrepareForeignTransaction; + routine->ResolvePreparedForeignTransaction = postgresResolvePreparedForeignTransaction; + routine->EndForeignTransaction = postgresEndForeignTransaction; + /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; @@ -490,6 +498,38 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) } /* + * postgresGetPrepareId + * + * The function crafts prepared transaction identifier. PostgreSQL documentation + * mentions two restrictions on the name + * 1. String literal, less than 200 bytes long. + * 2. Should not be same as any other concurrent prepared transaction id. + * + * To make the prepared transaction id, we should ideally use something like + * UUID, which gives unique ids with high probability, but that may be expensive + * here and UUID extension which provides the function to generate UUID is + * not part of the core. + */ +extern char * +postgresGetPrepareId(Oid serverid, Oid userid, int *prep_info_len) +{ + /* Maximum length of the prepared transaction id, borrowed from twophase.c */ +#define PREP_XACT_ID_MAX_LEN 200 +#define RANDOM_LARGE_MULTIPLIER 1000 + char*prep_info; + + /* Allocate the memory in the same context as the hash entry */ + prep_info = (char *)palloc(PREP_XACT_ID_MAX_LEN * sizeof(char)); + snprintf(prep_info, PREP_XACT_ID_MAX_LEN, "%s_%4ld_%d_%d", + "px", Abs(random() * RANDOM_LARGE_MULTIPLIER), + serverid, userid); + + /* Account for the last NULL byte */ + *prep_info_len = strlen(prep_info); + return prep_info; +} + +/* * postgresGetForeignRelSize * Estimate # of rows and width of the result of the scan * @@ -1336,7 +1376,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, true, false); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1685,6 +1725,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate, Oid userid; ForeignTable *table; UserMapping *user; + ForeignServer *server; AttrNumber n_params; Oid typefnoid; bool isvarlena; @@ -1712,9 +1753,15 @@ postgresBeginForeignModify(ModifyTableState *mtstate, /* Get info about foreign table. */ table = GetForeignTable(RelationGetRelid(rel)); user = GetUserMapping(userid, table->serverid); + server = GetForeignServer(user->serverid); + + /* Remember this foreign server has been modified */ + FdwXactRegisterForeignServer(user->serverid, user->userid, + server_uses_two_phase_commit(server), + true); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, true, false); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Deconstruct fdw_private data. */ @@ -2318,6 +2365,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) RangeTblEntry *rte; Oid userid; ForeignTable *table; + ForeignServer *server; UserMapping *user; int numParams; @@ -2348,12 +2396,18 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) dmstate->rel = node->ss.ss_currentRelation; table = GetForeignTable(RelationGetRelid(dmstate->rel)); user = GetUserMapping(userid, table->serverid); + server = GetForeignServer(user->serverid); + + /* Remember this foreign server has been modified */ + FdwXactRegisterForeignServer(user->serverid, user->userid, + server_uses_two_phase_commit(server), + true); /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, true, false); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2650,7 +2704,7 @@ estimate_path_cost_size(PlannerInfo *root, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, true, false); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3910,7 +3964,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, true, false); /* * Construct command to get page count for relation. @@ -4000,7 +4054,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, true, false); /* * Construct cursor that retrieves whole rows from remote. @@ -4223,7 +4277,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, true, false); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -5590,3 +5644,26 @@ find_em_expr_for_rel(EquivalenceClass *ec, RelOptInfo *rel) /* We didn't find any suitable equivalence class expression */ return NULL; } + +/* + * server_uses_two_phase_commit + * Returns true if the foreign server is configured to support 2PC. + */ +bool +server_uses_two_phase_commit(ForeignServer *server) +{ + ListCell *lc; + + /* Check the options for two phase compliance */ + foreach(lc, server->options) + { + DefElem *d = (DefElem *) lfirst(lc); + + if (strcmp(d->defname, "two_phase_commit") == 0) + { + return defGetBoolean(d); + } + } + /* By default a server is not 2PC compliant */ + return false; +} diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index d37cc88..b21a2fb 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -13,6 +13,7 @@ #ifndef POSTGRES_FDW_H #define POSTGRES_FDW_H +#include "access/fdwxact.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/relation.h" @@ -115,7 +116,9 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + bool start_transaction, bool connection_error_ok); +extern PGconn *GetExistingConnection(Oid umid); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); @@ -123,6 +126,14 @@ extern PGresult *pgfdw_get_result(PGconn *conn, const char *query); extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern bool postgresPrepareForeignTransaction(Oid serverid, Oid userid, + Oid umid, const char *prep_id); +extern bool postgresEndForeignTransaction(Oid serverid, Oid userid, + Oid umid, bool is_commit); +extern bool postgresResolvePreparedForeignTransaction(Oid serverid, Oid userid, + Oid umid, bool is_commit, + const char *prep_id); + /* in option.c */ extern int ExtractConnectionOptions(List *defelems, @@ -179,6 +190,7 @@ extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, List *remote_conds, List *pathkeys, bool is_subquery, List **retrieved_attrs, List **params_list); extern const char *get_jointype_name(JoinType jointype); +extern bool server_uses_two_phase_commit(ForeignServer *server); /* in shippable.c */ extern bool is_builtin(Oid objectId); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 0b2c528..7f74356 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -15,6 +15,10 @@ DO $d$ OPTIONS (dbname '$$||current_database()||$$', port '$$||current_setting('port')||$$' )$$; + EXECUTE $$CREATE SERVER loopback3 FOREIGN DATA WRAPPER postgres_fdw + OPTIONS (dbname '$$||current_database()||$$', + port '$$||current_setting('port')||$$' + )$$; END; $d$; @@ -22,6 +26,7 @@ CREATE USER MAPPING FOR public SERVER testserver1 OPTIONS (user 'value', password 'value'); CREATE USER MAPPING FOR CURRENT_USER SERVER loopback; CREATE USER MAPPING FOR CURRENT_USER SERVER loopback2; +CREATE USER MAPPING FOR CURRENT_USER SERVER loopback3; -- =================================================================== -- create objects used through FDW loopback server @@ -56,6 +61,14 @@ CREATE TABLE "S 1"."T 4" ( c3 text, CONSTRAINT t4_pkey PRIMARY KEY (c1) ); +CREATE TABLE "S 1"."T 5" ( + c1 int NOT NULL +); + +CREATE TABLE "S 1"."T 6" ( + c1 int NOT NULL, + CONSTRAINT t6_pkey PRIMARY KEY (c1) +); INSERT INTO "S 1"."T 1" SELECT id, @@ -88,6 +101,7 @@ ANALYZE "S 1"."T 1"; ANALYZE "S 1"."T 2"; ANALYZE "S 1"."T 3"; ANALYZE "S 1"."T 4"; +ANALYZE "S 1"."T 5"; -- =================================================================== -- create foreign tables @@ -136,6 +150,19 @@ CREATE FOREIGN TABLE ft6 ( c3 text ) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 4'); +CREATE FOREIGN TABLE ft7_not_twophase ( + c1 int NOT NULL +) SERVER loopback OPTIONS (schema_name 'S 1', table_name 'T 5'); + +CREATE FOREIGN TABLE ft8_twophase ( + c1 int NOT NULL +) SERVER loopback2 OPTIONS (schema_name 'S 1', table_name 'T 5'); + +CREATE FOREIGN TABLE ft9_twophase ( + c1 int NOT NULL +) SERVER loopback3 OPTIONS (schema_name 'S 1', table_name 'T 5'); + + -- A table with oids. CREATE FOREIGN TABLE doesn't support the -- WITH OIDS option, but ALTER does. CREATE FOREIGN TABLE ft_pg_type ( @@ -1905,3 +1932,109 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE t1.a%25 = 0 ORDER BY 1,2; RESET enable_partition_wise_join; + +-- =================================================================== +-- test Atomic commit across foreign servers +-- =================================================================== + +ALTER SERVER loopback OPTIONS(ADD two_phase_commit 'off'); +ALTER SERVER loopback2 OPTIONS(ADD two_phase_commit 'on'); +ALTER SERVER loopback3 OPTIONS(ADD two_phase_commit 'on'); + +-- Check two_phase_commit setting +SELECT srvname FROM pg_foreign_server WHERE 'two_phase_commit=on' = ANY(srvoptions) or 'two_phase_commit=off' = ANY(srvoptions); + +-- modify one supported server and commit. +BEGIN; +INSERT INTO ft8_twophase VALUES(1); +COMMIT; +SELECT * FROM ft8_twophase; + +-- modify one supported server and rollback. +BEGIN; +INSERT INTO ft8_twophase VALUES(1); +ROLLBACK; +SELECT * FROM ft8_twophase; + +-- modify two supported server and commit. +BEGIN; +INSERT INTO ft8_twophase VALUES(2); +INSERT INTO ft9_twophase VALUES(2); +COMMIT; +SELECT * FROM ft8_twophase; +SELECT * FROM ft9_twophase; + +-- modify two supported server and rollback. +BEGIN; +INSERT INTO ft8_twophase VALUES(3); +INSERT INTO ft9_twophase VALUES(3); +ROLLBACK; +SELECT * FROM ft8_twophase; +SELECT * FROM ft9_twophase; + +-- modify local and one supported server and commit. +BEGIN; +INSERT INTO ft8_twophase VALUES(4); +INSERT INTO "S 1"."T 6" VALUES (4); +COMMIT; +SELECT * FROM ft8_twophase; +SELECT * FROM "S 1"."T 6"; + +-- modify local and one supported server and rollback. +BEGIN; +INSERT INTO ft8_twophase VALUES(5); +INSERT INTO "S 1"."T 6" VALUES (5); +ROLLBACK; +SELECT * FROM ft8_twophase; +SELECT * FROM "S 1"."T 6"; + +-- modify supported server and non-supported server and commit. +BEGIN; +INSERT INTO ft7_not_twophase VALUES(6); +INSERT INTO ft8_twophase VALUES(6); +COMMIT; +SELECT * FROM ft7_not_twophase; +SELECT * FROM ft8_twophase; + +-- modify supported server and non-supported server and rollback. +BEGIN; +INSERT INTO ft7_not_twophase VALUES(7); +INSERT INTO ft8_twophase VALUES(7); +ROLLBACK; +SELECT * FROM ft7_not_twophase; +SELECT * FROM ft8_twophase; + +-- modify foreign server and raise an error +BEGIN; +INSERT INTO ft8_twophase VALUES(8); +INSERT INTO ft9_twophase VALUES(NULL); -- violation +COMMIT; +SELECT * FROM ft8_twophase; +SELECT * FROM ft9_twophase; + +-- commit and rollback foreign transactions that are part of +-- prepare transaction. +BEGIN; +INSERT INTO ft8_twophase VALUES(9); +INSERT INTO ft9_twophase VALUES(9); +PREPARE TRANSACTION 'gx1'; +COMMIT PREPARED 'gx1'; +SELECT * FROM ft8_twophase; +SELECT * FROM ft9_twophase; + +BEGIN; +INSERT INTO ft8_twophase VALUES(10); +INSERT INTO ft9_twophase VALUES(10); +PREPARE TRANSACTION 'gx1'; +ROLLBACK PREPARED 'gx1'; +SELECT * FROM ft8_twophase; +SELECT * FROM ft9_twophase; + +-- fails, cannot prepare the transaction if non-supporeted +-- server involved in. +BEGIN; +INSERT INTO ft7_not_twophase VALUES(11); +INSERT INTO ft8_twophase VALUES(11); +PREPARE TRANSACTION 'gx1'; +SELECT * FROM ft7_not_twophase; +SELECT * FROM ft8_twophase; diff --git a/doc/src/sgml/postgres-fdw.sgml b/doc/src/sgml/postgres-fdw.sgml index 54b5e98..f065b7b 100644 --- a/doc/src/sgml/postgres-fdw.sgml +++ b/doc/src/sgml/postgres-fdw.sgml @@ -436,6 +436,43 @@ + + + Transaction Management Options + + + By default, if the transaction involves with multiple remote server, + each transaction on remote server is committed or aborted independently. + Some of transactions may fail to commit on remote server while other + transactions commit successfully. This may be overridden using + following option: + + + + + + two_phase_commit + + + This option controls whether postgres_fdw allows + to use two-phase-commit when transaction commits. This option can + only be sepcified for foreign servers, not per-table. + The default is false. + + + + If this option is enabled, postgres_fdw prepares + transaction on remote server and PostgreSQL + keeps track of the distributed transaction. + must be set more + than 1 on local server and + must set to more than 1 on remote server. + + + + + + -- 1.7.1