From f8aaf91d0fdb88c6726c45f924d8dea19bf98cff Mon Sep 17 00:00:00 2001 From: Amul Sul Date: Wed, 3 Aug 2022 10:17:09 -0400 Subject: [PATCH v2 4/4] TRIAL: cleanup xact callback - WIP --- contrib/postgres_fdw/connection.c | 420 +++++++++++++++--------------- 1 file changed, 213 insertions(+), 207 deletions(-) diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 6e23046ad69..181d2f83e32 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -110,9 +110,8 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors); static bool pgfdw_get_result_timed(PGconn *conn, TimestampTz endtime, PGresult **result, bool *timed_out); -static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel); -static bool pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql, - List **pending_entries, bool toplevel); +static void pgfdw_abort_cleanup(bool toplevel); +static void pgfdw_exec_pre_commit(bool toplevel); static void pgfdw_finish_pre_commit(List *pending_entries, const char *sql, bool toplevel); static bool UserMappingPasswordRequired(UserMapping *user); @@ -880,80 +879,47 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, static void pgfdw_xact_callback(XactEvent event, void *arg) { - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - List *pending_entries = NIL; - /* Quick exit if no connections were touched in this transaction. */ if (!xact_got_connection) return; - /* - * Scan all connection cache entries to find open remote transactions, and - * close them. - */ - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + switch (event) { - /* Ignore cache entry if no open connection right now */ - if (entry->conn == NULL) - continue; - - /* If it has an open remote transaction, try to close it */ - if (entry->xact_depth > 0) - { - elog(DEBUG3, "closing remote transaction on connection %p", - entry->conn); + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_COMMIT: - switch (event) - { - case XACT_EVENT_PARALLEL_PRE_COMMIT: - case XACT_EVENT_PRE_COMMIT: - - /* Commit all remote transactions during pre-commit */ - if (pgfdw_exec_pre_commit(entry, "COMMIT TRANSACTION", - &pending_entries, true)) - continue; - break; - case XACT_EVENT_PRE_PREPARE: - - /* - * We disallow any remote transactions, since it's not - * very reasonable to hold them open until the prepared - * transaction is committed. For the moment, throw error - * unconditionally; later we might allow read-only cases. - * Note that the error will cause us to come right back - * here with event == XACT_EVENT_ABORT, so we'll clean up - * the connection state at that point. - */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables"))); - break; - case XACT_EVENT_PARALLEL_COMMIT: - case XACT_EVENT_COMMIT: - case XACT_EVENT_PREPARE: - /* Pre-commit should have closed the open transaction */ - elog(ERROR, "missed cleaning up connection during pre-commit"); - break; - case XACT_EVENT_PARALLEL_ABORT: - case XACT_EVENT_ABORT: - /* Rollback all remote transactions during abort */ - pgfdw_abort_cleanup(entry, true); - break; - } - } + /* Commit all remote transactions during pre-commit */ + pgfdw_exec_pre_commit(true); + break; - /* Reset state to show we're out of a transaction */ - pgfdw_reset_xact_state(entry, true); - } + case XACT_EVENT_PRE_PREPARE: - /* If there are any pending connections, finish cleaning them up */ - if (pending_entries) - { - Assert(event == XACT_EVENT_PARALLEL_PRE_COMMIT || - event == XACT_EVENT_PRE_COMMIT); - pgfdw_finish_pre_commit(pending_entries, "COMMIT TRANSACTION", true); + /* + * We disallow any remote transactions, since it's not + * very reasonable to hold them open until the prepared + * transaction is committed. For the moment, throw error + * unconditionally; later we might allow read-only cases. + * Note that the error will cause us to come right back + * here with event == XACT_EVENT_ABORT, so we'll clean up + * the connection state at that point. + */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables"))); + break; + + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_COMMIT: + case XACT_EVENT_PREPARE: + /* Pre-commit should have closed the open transaction */ + elog(ERROR, "missed cleaning up connection during pre-commit"); + break; + + case XACT_EVENT_PARALLEL_ABORT: + case XACT_EVENT_ABORT: + /* Rollback all remote transactions during abort */ + pgfdw_abort_cleanup(true); + break; } /* @@ -974,12 +940,6 @@ static void pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg) { - HASH_SEQ_STATUS scan; - ConnCacheEntry *entry; - int curlevel; - List *pending_entries = NIL; - char sql[100]; - /* Nothing to do at subxact start, nor after commit. */ if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || event == SUBXACT_EVENT_ABORT_SUB)) @@ -989,48 +949,15 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid, if (!xact_got_connection) return; - /* - * Scan all connection cache entries to find open remote subtransactions - * of the current level, and close them. - */ - curlevel = GetCurrentTransactionNestLevel(); - snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); - - hash_seq_init(&scan, ConnectionHash); - while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) { - /* - * We only care about connections with open remote subtransactions of - * the current level. - */ - if (entry->conn == NULL || entry->xact_depth < curlevel) - continue; - - if (entry->xact_depth > curlevel) - elog(ERROR, "missed cleaning up remote subtransaction at level %d", - entry->xact_depth); - - if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) - { - /* Commit all remote subtransactions during pre-commit */ - if (pgfdw_exec_pre_commit(entry, sql, &pending_entries, false)) - continue; - } - else - { - /* Rollback all remote subtransactions during abort */ - pgfdw_abort_cleanup(entry, false); - } - - /* OK, we're outta that level of subtransaction */ - pgfdw_reset_xact_state(entry, false); + /* Commit all remote subtransactions during pre-commit */ + pgfdw_exec_pre_commit(false); } - - /* If there are any pending connections, finish cleaning them up */ - if (pending_entries) + else { - Assert(event == SUBXACT_EVENT_PRE_COMMIT_SUB); - pgfdw_finish_pre_commit(pending_entries, sql, false); + /* Rollback all remote subtransactions during abort */ + pgfdw_abort_cleanup(false); } } @@ -1394,131 +1321,210 @@ exit: ; * Set entry->changing_xact_state to false on success, true on failure. */ static void -pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel) +pgfdw_abort_cleanup(bool toplevel) { + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; char sql[100]; /* - * Don't try to clean up the connection if we're already in error - * recursion trouble. + * Scan all connection cache entries to find open remote transactions, and + * close them. */ - if (in_error_recursion_trouble()) - entry->changing_xact_state = true; + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* Ignore cache entry if no open connection right now */ + if (entry->conn == NULL) + continue; - /* - * If connection is already unsalvageable, don't touch it further. - */ - if (entry->changing_xact_state) - return; + /* Sanity check for subtransaction */ + if (!toplevel) + { + int curlevel = GetCurrentTransactionNestLevel(); - /* - * Mark this connection as in the process of changing transaction state. - */ - entry->changing_xact_state = true; + if (entry->xact_depth < curlevel) + continue; - /* Assume we might have lost track of prepared statements */ - entry->have_error = true; + if (entry->xact_depth > curlevel) + elog(ERROR, "missed cleaning up remote subtransaction at level %d", + entry->xact_depth); + } - /* - * If a command has been submitted to the remote server by using an - * asynchronous execution function, the command might not have yet - * completed. Check to see if a command is still being processed by the - * remote server, and if so, request cancellation of the command. - */ - if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && - !pgfdw_cancel_query(entry->conn)) - return; /* Unable to cancel running query */ + /* If it has an open remote transaction, try to close it */ + if (entry->xact_depth > 0) + { + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + if (in_error_recursion_trouble()) + entry->changing_xact_state = true; - if (toplevel) - snprintf(sql, sizeof(sql), "ABORT TRANSACTION"); - else - snprintf(sql, sizeof(sql), - "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", - entry->xact_depth, entry->xact_depth); - if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) - return; /* Unable to abort remote (sub)transaction */ + /* + * If connection is already unsalvageable, don't touch it further. + */ + if (entry->changing_xact_state) + goto xact_abort_end; - if (toplevel) - { - if (entry->have_prep_stmt && entry->have_error && - !pgfdw_exec_cleanup_query(entry->conn, - "DEALLOCATE ALL", - true)) - return; /* Trouble clearing prepared statements */ + /* + * Mark this connection as in the process of changing transaction state. + */ + entry->changing_xact_state = true; - entry->have_prep_stmt = false; - entry->have_error = false; - } + /* Assume we might have lost track of prepared statements */ + entry->have_error = true; - /* - * If pendingAreq of the per-connection state is not NULL, it means that - * an asynchronous fetch begun by fetch_more_data_begin() was not done - * successfully and thus the per-connection state was not reset in - * fetch_more_data(); in that case reset the per-connection state here. - */ - if (entry->state.pendingAreq) - memset(&entry->state, 0, sizeof(entry->state)); + /* + * If a command has been submitted to the remote server by using an + * asynchronous execution function, the command might not have yet + * completed. Check to see if a command is still being processed by the + * remote server, and if so, request cancellation of the command. + */ + if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE && + !pgfdw_cancel_query(entry->conn)) + goto xact_abort_end; /* Unable to cancel running query */ - /* Disarm changing_xact_state if it all worked */ - entry->changing_xact_state = false; + if (toplevel) + snprintf(sql, sizeof(sql), "ABORT TRANSACTION"); + else + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", + entry->xact_depth, entry->xact_depth); + if (!pgfdw_exec_cleanup_query(entry->conn, sql, false)) + goto xact_abort_end; /* Unable to abort remote (sub)transaction */ + + if (toplevel) + { + if (entry->have_prep_stmt && entry->have_error && + !pgfdw_exec_cleanup_query(entry->conn, + "DEALLOCATE ALL", + true)) + goto xact_abort_end; /* Trouble clearing prepared statements */ + + entry->have_prep_stmt = false; + entry->have_error = false; + } + + /* + * If pendingAreq of the per-connection state is not NULL, it means that + * an asynchronous fetch begun by fetch_more_data_begin() was not done + * successfully and thus the per-connection state was not reset in + * fetch_more_data(); in that case reset the per-connection state here. + */ + if (entry->state.pendingAreq) + memset(&entry->state, 0, sizeof(entry->state)); + + /* Disarm changing_xact_state if it all worked */ + entry->changing_xact_state = false; + } + +xact_abort_end: + /* Reset state to show we're out of a transaction */ + pgfdw_reset_xact_state(entry, toplevel); + } } /* * Commit all remote transactions or subtransactions during pre-commit. * - * If parallel_commit is enabled at this connection cache entry and - * the result of "sql" needs to be gotten later, return true and append - * this entry to "pending_entries". - * * "toplevel" should be set to true if toplevel (main) transaction is * committed, false otherwise. */ -static bool -pgfdw_exec_pre_commit(ConnCacheEntry *entry, const char *sql, - List **pending_entries, bool toplevel) +static void +pgfdw_exec_pre_commit(bool toplevel) { - PGresult *res; - - /* - * If abort cleanup previously failed for this connection, we can't issue - * any more commands against it. - */ - pgfdw_reject_incomplete_xact_state_change(entry); + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + List *pending_entries = NIL; + int curlevel = 0; + char sql[100]; - entry->changing_xact_state = true; - if (entry->parallel_commit) + /* Form SQL query */ + if (toplevel) { - do_sql_command_begin(entry->conn, sql); - *pending_entries = lappend(*pending_entries, entry); - return true; + snprintf(sql, sizeof(sql), "COMMIT TRANSACTION"); + } + else + { + curlevel = GetCurrentTransactionNestLevel(); + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); } - do_sql_command(entry->conn, sql); - entry->changing_xact_state = false; - - if (!toplevel) - return false; /* - * If there were any errors in subtransactions, and we made prepared - * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared - * statements. This is annoying and not terribly bulletproof, but it's - * probably not worth trying harder. - * - * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old - * a server postgres_fdw can communicate with. We intentionally ignore - * errors in the DEALLOCATE, so that we can hobble along to some extent - * with older servers (leaking prepared statements as we go; but we don't - * really support update operations pre-8.3 anyway). + * Scan all connection cache entries to find open remote transactions, and + * close them. */ - if (entry->have_prep_stmt && entry->have_error) + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) { - res = PQexec(entry->conn, "DEALLOCATE ALL"); - PQclear(res); + /* Ignore cache entry if no open connection right now */ + if (entry->conn == NULL) + continue; + + /* Sanity check for subtransaction */ + if (!toplevel) + { + if (entry->xact_depth < curlevel) + continue; + + if (entry->xact_depth > curlevel) + elog(ERROR, "missed cleaning up remote subtransaction at level %d", + entry->xact_depth); + } + + /* If it has an open remote transaction, try to close it */ + if (entry->xact_depth > 0) + { + /* + * If abort cleanup previously failed for this connection, we can't issue + * any more commands against it. + */ + pgfdw_reject_incomplete_xact_state_change(entry); + + entry->changing_xact_state = true; + if (entry->parallel_commit) + { + do_sql_command_begin(entry->conn, sql); + pending_entries = lappend(pending_entries, entry); + continue; + } + do_sql_command(entry->conn, sql); + entry->changing_xact_state = false; + + if (toplevel) + { + /* + * If there were any errors in subtransactions, and we made prepared + * statements, do a DEALLOCATE ALL to make sure we get rid of all prepared + * statements. This is annoying and not terribly bulletproof, but it's + * probably not worth trying harder. + * + * DEALLOCATE ALL only exists in 8.3 and later, so this constrains how old + * a server postgres_fdw can communicate with. We intentionally ignore + * errors in the DEALLOCATE, so that we can hobble along to some extent + * with older servers (leaking prepared statements as we go; but we don't + * really support update operations pre-8.3 anyway). + */ + if (entry->have_prep_stmt && entry->have_error) + { + PGresult *res; + + res = PQexec(entry->conn, "DEALLOCATE ALL"); + PQclear(res); + } + entry->have_prep_stmt = false; + entry->have_error = false; + } + } + + /* Reset state to show we're out of a transaction */ + pgfdw_reset_xact_state(entry, toplevel); } - entry->have_prep_stmt = false; - entry->have_error = false; - return false; + /* If there are any pending connections, finish cleaning them up */ + if (pending_entries) + pgfdw_finish_pre_commit(pending_entries, sql, toplevel); } /* -- 2.18.0