diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 8c64d42dda..cc9aeab799 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -23,12 +23,14 @@ #include "postgres_fdw.h" #include "storage/fd.h" #include "storage/latch.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/datetime.h" #include "utils/hsearch.h" #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "utils/timeout.h" /* * Connection cache hash table entry @@ -117,6 +119,10 @@ static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel); static bool UserMappingPasswordRequired(UserMapping *user); static bool disconnect_cached_connections(Oid serverid); +static void pgfdw_connection_check(void); +static bool pgfdw_connection_check_internal(PGconn *conn); +static TimeoutId pgfdw_health_check_timeout = MAX_TIMEOUTS; +int pgfdw_health_check_interval; /* * Get a PGconn which can be used to execute queries on the remote PostgreSQL @@ -161,6 +167,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) pgfdw_inval_callback, (Datum) 0); CacheRegisterSyscacheCallback(USERMAPPINGOID, pgfdw_inval_callback, (Datum) 0); + + /* Register a timeout for checking remote servers */ + pgfdw_health_check_timeout = RegisterTimeout(USER_TIMEOUT, pgfdw_connection_check); } /* Set flag that we did GetConnection during the current transaction */ @@ -284,6 +293,12 @@ GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) if (state) *state = &entry->state; + /* Fire timeout if needed */ + if (pgfdw_health_check_interval > 0 && + !get_timeout_active(pgfdw_health_check_timeout)) + enable_timeout_after(pgfdw_health_check_timeout, + pgfdw_health_check_interval); + return entry->conn; } @@ -1040,6 +1055,9 @@ pgfdw_xact_callback(XactEvent event, void *arg) */ xact_got_connection = false; + /* stop timer because checking is no more needed. */ + disable_timeout(pgfdw_health_check_timeout, false); + /* Also reset cursor numbering for next transaction */ cursor_number = 0; } @@ -1889,3 +1907,133 @@ disconnect_cached_connections(Oid serverid) return result; } + +/* + * Signal handler for checking remote servers. + * + * This function searches the hash table from the beginning + * and performs a health-check on each entry. + * + * Raise SIGINT if someone might be down, otherwise do nothing. + */ +void +pgfdw_connection_check(void) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + Assert(ConnectionHash); + + /* + * checking will be done by waiting WL_SOCKET_CLOSED event, + * so exit immediately if it cannot be used in this system. + */ + if (!WaitEventSetCanReportClosed()) + return; + + /* Quick exit if QueryCancelMessage has already set. */ + if (QueryCancelMessage != NULL) + return; + + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + if (entry->conn == NULL || entry->xact_depth == 0) + continue; + if (!pgfdw_connection_check_internal(entry->conn)) + { + /* + * Foreign server might be down, so raise SIGINT. + * Note that error message is passed to QueryCancelMessage + * for reporting error in ProcessInterrupts(). + */ + char msg[31 + MAXDATELEN]; + MemoryContext old; + ForeignServer *server; + + /* + * Switch to CurTransactionContext in order to + * make sure that the lifetime of palloc'd is transaction. + */ + old = MemoryContextSwitchTo(CurTransactionContext); + server = GetForeignServer(entry->serverid); + snprintf(msg, sizeof(msg), "Foreign Server %s might be down.", server->servername); + QueryCancelMessage = pstrdup(msg); + MemoryContextSwitchTo(old); + + disconnect_pg_server(entry); + hash_seq_term(&scan); + + raise(SIGINT); + break; + } + } + + /* re-schedule timer if needed. */ + if (pgfdw_health_check_interval > 0) + enable_timeout_after(pgfdw_health_check_timeout, + pgfdw_health_check_interval); + + return; +} + +/* + * helper function for pgfdw_connection_check + */ +static bool +pgfdw_connection_check_internal(PGconn *conn) +{ + WaitEventSet *eventset; + WaitEvent events; + + Assert(WaitEventSetCanReportClosed()); + + eventset = CreateWaitEventSet(CurrentMemoryContext, 1); + AddWaitEventToSet(eventset, WL_SOCKET_CLOSED, PQsocket(conn), NULL, NULL); + + WaitEventSetWait(eventset, 0, &events, 1, 0); + + if (events.events & WL_SOCKET_CLOSED) + { + FreeWaitEventSet(eventset); + return false; + } + FreeWaitEventSet(eventset); + + return true; +} + +bool +check_pgfdw_health_check_interval(int *newval, void **extra, GucSource source) +{ + if (!WaitEventSetCanReportClosed() && *newval != 0) + { + GUC_check_errdetail("pgfdw_health_check_interval must be set to 0 on this platform"); + return false; + } + return true; +} + +void +assign_pgfdw_health_check_interval(int newval, void *extra) +{ + /* Quick return if timeout is not registered yet. */ + if (pgfdw_health_check_timeout == MAX_TIMEOUTS) + return; + + if (get_timeout_active(pgfdw_health_check_timeout)) + { + if (newval == 0) + disable_timeout(pgfdw_health_check_timeout, false); + + /* + * we don't have to do anything because + * new value will be used in pgfdw_connection_check(). + */ + return; + } + + /* Start timeout if wants to */ + if (newval > 0) + enable_timeout_after(pgfdw_health_check_timeout, newval); +} diff --git a/contrib/postgres_fdw/option.c b/contrib/postgres_fdw/option.c index 572591a558..496a27960c 100644 --- a/contrib/postgres_fdw/option.c +++ b/contrib/postgres_fdw/option.c @@ -540,5 +540,18 @@ _PG_init(void) NULL, NULL); + DefineCustomIntVariable("postgres_fdw.health_check_interval", + "Sets the time interval between checks of remote servers.", + NULL, + &pgfdw_health_check_interval, + 0, + 0, + INT_MAX, + PGC_USERSET, + GUC_UNIT_MS, + check_pgfdw_health_check_interval, + assign_pgfdw_health_check_interval, + NULL); + MarkGUCPrefixReserved("postgres_fdw"); } diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 8ae79e97e4..c129af5082 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -18,6 +18,7 @@ #include "libpq-fe.h" #include "nodes/execnodes.h" #include "nodes/pathnodes.h" +#include "utils/guc.h" #include "utils/relcache.h" /* @@ -151,6 +152,10 @@ extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state); extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn, bool clear, const char *sql); +extern bool check_pgfdw_health_check_interval(int *newval, void **extra, + GucSource source); +extern void assign_pgfdw_health_check_interval(int newval, void *extra); +extern int pgfdw_health_check_interval; /* in option.c */ extern int ExtractConnectionOptions(List *defelems,