Re: Streaming Replication on win32 - Mailing list pgsql-hackers
From | Heikki Linnakangas |
---|---|
Subject | Re: Streaming Replication on win32 |
Date | |
Msg-id | 4B584C99.8090004@enterprisedb.com Whole thread Raw |
In response to | Re: Streaming Replication on win32 (Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>) |
Responses |
Re: Streaming Replication on win32
|
List | pgsql-hackers |
Heikki Linnakangas wrote: > Magnus Hagander wrote: >> 2010/1/17 Heikki Linnakangas <heikki.linnakangas@enterprisedb.com>: >>> We could replace the blocking PQexec() calls with PQsendQuery(), and use >>> the emulated version of select() to wait. >> Hmm. That would at least theoretically work, but aren't there still >> places we may end up blocking further down? Or are those ok? > > There's also PQconnect that needs similar treatment (using > PQconnectStart/Poll()), but that's it. So here's a patch implementing that for contrib/dblink. Walreceiver needs the same treatment. The implementation should be shared between the two, but I'm not sure how. We can't just put the wrapper functions to a module in src/backend/port/, because the wrapper functions depend on libpq. Maybe put them in a new header file as static functions, and include that in contrib/dblink/dblink.c and src/backend/replication/libpqwalreceiver.c. -- Heikki Linnakangas EnterpriseDB http://www.enterprisedb.com diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 2c1d7a2..fa11709 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -34,6 +34,14 @@ #include <limits.h> +#ifdef WIN23 +/* These are needed by the interruptible libpq function replacements */ +#include <time.h> +#include <unistd.h> +#include <sys/time.h> +#include <sys/types.h> +#endif + #include "libpq-fe.h" #include "fmgr.h" #include "funcapi.h" @@ -101,6 +109,193 @@ static void dblink_res_error(const char *conname, PGresult *res, const char *dbl static char *get_connect_string(const char *servername); static char *escape_param_str(const char *from); +#ifdef WIN23 +/* + * Replacement functions for blocking libpq functions, for Windows. + * + * On Windows, the vanilla select() function doesn't react to our emulated + * signals. PQexec() and PQconnectdb() use select(), so they're + * uninterruptible. These replacement functions use the corresponding + * asynchronous libpq functions and backend version of select() to implement + * the same functionality, but in a way that's interrupted by signals. + * + * These work on other platforms as well, but presumably it's more efficient + * to let libpq block. + */ + +static PGresult * +dblink_PQexec(PGconn *conn, const char *command) +{ + int sock; + PGresult *result, + *lastResult; + + /* Send query. This can block too, but we ignore that for now. */ + if (PQsendQuery(conn, command) == 0) + return NULL; + + /* Wait for response */ + sock = PQsocket(conn); + + while(PQisBusy(conn)) + { + fd_set input_mask; + + FD_ZERO(&input_mask); + + FD_SET (sock, &input_mask); + + /* + * Note that we don't check the return code. We assume that + * PQconsumeInput() will get the same error, and set the result + * as failed. + */ + select(sock + 1, &input_mask, NULL, NULL, NULL); + PQconsumeInput(conn); + } + + /* + * Emulate PQexec()'s behavior of returning the *last* result, if + * there's many. dblink doesn't normally issue statements that return + * multiple results, but the user-supplied SQL statement passed to + * dblink() might. You'll only get the last result back, so it's not a + * very sensible thing to do, but we must still handle it gracefully. + * + * We don't try to concatenate error messages like PQexec() does. + * Doesn't seem worth the effort. + */ + lastResult = NULL; + while((result = PQgetResult(conn)) != NULL) + { + if (lastResult != NULL) + { + if (PQresultStatus(lastResult) != PGRES_COMMAND_OK && + PQresultStatus(lastResult) != PGRES_TUPLES_OK) + { + PQclear(result); + result = lastResult; + } + else + PQclear(lastResult); + } + lastResult = result; + } + + return lastResult; +} + +static PGconn * +dblink_PQconnectdb(const char *conninfo) +{ + PGconn *conn; + PostgresPollingStatusType status; + PQconninfoOption *options; + int timeout_secs = 0; + time_t end_time; + int sock; + + conn = PQconnectStart(conninfo); + if (conn == NULL) + return NULL; + + if (PQstatus(conn) == CONNECTION_BAD) + return conn; + + /* Extract timeout from the connection string */ + options = PQconninfoParse(conninfo, NULL); + if (options) + { + PQconninfoOption *option; + for (option = options; option->keyword != NULL; option++) + { + if (strcmp(option->keyword, "connect_timeout") == 0) + { + if (option->val != NULL && option->val[0] != '\0') + { + timeout_secs = atoi(option->val); + break; + } + } + } + PQconninfoFree(options); + } + if (timeout_secs > 0) + end_time = time(NULL) + timeout_secs; + + sock = PQsocket(conn); + + /* Wait for connection to be established */ + for (;;) + { + fd_set input_mask; + fd_set output_mask; + time_t now; + struct timeval timeout; + struct timeval *timeout_ptr; + + FD_ZERO(&input_mask); + FD_ZERO(&output_mask); + + status = PQconnectPoll(conn); + switch(status) + { + case PGRES_POLLING_OK: + case PGRES_POLLING_FAILED: + return conn; + + case PGRES_POLLING_READING: + FD_SET(sock, &input_mask); + break; + + case PGRES_POLLING_WRITING: + FD_SET(sock, &output_mask); + break; + + default: + elog(ERROR, "unknown PQconnectPoll() return value: %d", status); + } + + if (timeout_secs > 0) + { + now = time(NULL); + timeout.tv_sec = (now > end_time) ? 0 : (end_time - now); + timeout.tv_usec = 0; + timeout_ptr = &timeout; + } + else + timeout_ptr = NULL; + + /* + * Note that we don't check an error code. We assume that + * PQconnectPoll() will get the same error, and return failure. + */ + if (select(sock + 1, &input_mask, &output_mask, NULL, timeout_ptr) == 0) + { + /* Timeout */ + PQfinish(conn); + + /* + * This message is subtly different from the one from the message + * you get on other platforms, where PQconnectdb() handles the + * timeout. The "timeout expired" message here gets translated + * using the backend .po file, while the message emitted by + * PQconnectdb() is translated using libpq .po file. I hope that + * makes no difference in practice. + */ + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail("timeout expired"))); + } + } + return NULL; /* not reached, keep compiler quiet */ +} + +#define PQexec(conn, command) dblink_PQexec(conn, command) +#define PQconnectdb(conninfo) dblink_PQconnectdb(conninfo) + +#endif + /* Global */ static remoteConn *pconn = NULL; static HTAB *remoteConnHash = NULL;
pgsql-hackers by date: