diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 08daf26fdf..413d603a03 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -59,6 +59,7 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -105,7 +106,7 @@ static bool UserMappingPasswordRequired(UserMapping *user); * (not even on error), we need this flag to cue manual cleanup. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; ConnCacheEntry *entry; @@ -197,6 +198,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); @@ -213,6 +215,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 90db550b92..540ed3e711 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6973,7 +6973,7 @@ INSERT INTO a(aa) VALUES('aaaaa'); INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+------- a | aaa @@ -7001,7 +7001,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7029,7 +7029,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7057,7 +7057,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | newtoo @@ -7085,7 +7085,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+---- (0 rows) @@ -7127,23 +7127,28 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 @@ -7153,9 +7158,9 @@ select * from bar where f1 in (select f1 from foo) for update; -> Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -7165,23 +7170,28 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 @@ -7191,9 +7201,9 @@ select * from bar where f1 in (select f1 from foo) for share; -> Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index a31abce7c9..d5f5a7d4e8 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execReady.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -159,6 +160,9 @@ typedef struct PgFdwScanState MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ + + /* per-connection state */ + PgFdwConnState *conn_state; } PgFdwScanState; /* @@ -392,6 +396,8 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *output_rel, void *extra); +static int postgresReady(ForeignScanState *node); + /* * Helper functions */ @@ -419,6 +425,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); +static void fetch_more_data_begin(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, @@ -558,6 +565,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support for asynchrony */ + routine->Ready = postgresReady; + PG_RETURN_POINTER(routine); } @@ -1433,7 +1443,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1484,6 +1494,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + fsstate->conn_state->async_query_sent = false; } /* @@ -1596,6 +1607,13 @@ postgresEndForeignScan(ForeignScanState *node) if (fsstate == NULL) return; + /* + * If we're ending before we've collected a response from an asynchronous + * query, we have to consume the response. + */ + if (fsstate->conn_state->async_query_sent) + fetch_more_data(node); + /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); @@ -2371,7 +2389,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, NULL); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2745,7 +2763,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3382,6 +3400,35 @@ create_cursor(ForeignScanState *node) pfree(buf.data); } +/* + * Begin an asynchronous data fetch. + * fetch_more_data must be called to fetch the results.. + */ +static void +fetch_more_data_begin(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PGconn *conn = fsstate->conn; + char sql[64]; + + Assert(!fsstate->conn_state->async_query_sent); + + /* + * Create the cursor synchronously. (With more state machine stuff we + * could do this asynchronously too). + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query); + fsstate->conn_state->async_query_sent = true; +} + /* * Fetch some more rows from the node's cursor. */ @@ -3408,13 +3455,28 @@ fetch_more_data(ForeignScanState *node) int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (!fsstate->conn_state->async_query_sent) + { + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + res = pgfdw_exec_query(conn, sql); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } + else + { + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = PQgetResult(conn); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3441,6 +3503,15 @@ fetch_more_data(ForeignScanState *node) /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); + + /* If this was the second part of an async request, we must fetch until NULL. */ + if (fsstate->conn_state->async_query_sent) + { + /* call once and raise error if not NULL as expected? */ + while (PQgetResult(conn) != NULL) + ; + fsstate->conn_state->async_query_sent = false; + } } PG_FINALLY(); { @@ -3565,7 +3636,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, NULL); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4440,7 +4511,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4526,7 +4597,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4754,7 +4825,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -5559,6 +5630,41 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +static int +postgresReady(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + if (fsstate->conn_state->async_query_sent) + { + /* + * We have already started a query, for some other executor node. We + * currently can't handle two at the same time (we'd have to create + * more connections for that). + */ + return EXEC_READY_BUSY; + } + else if (fsstate->next_tuple < fsstate->num_tuples) + { + /* We already have buffered tuples. */ + return EXEC_READY_MORE; + } + else if (fsstate->eof_reached) + { + /* We have already hit the end of the scan. */ + return EXEC_READY_EOF; + } + else + { + /* + * We will start a query now, and tell the caller to wait until the + * file descriptor says we're ready and then call ExecProcNode. + */ + fetch_more_data_begin(node); + return PQsocket(fsstate->conn); + } +} + /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..a3c11e8d96 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -19,6 +19,15 @@ #include "nodes/pathnodes.h" #include "utils/relcache.h" +/* + * Extra control information relating to a connection. + */ +typedef struct PgFdwConnState +{ + /* Has an asynchronous query been sent? */ + bool async_query_sent; +} PgFdwConnState; + /* * FDW-specific planner information kept in RelOptInfo.fdw_private for a * postgres_fdw foreign table. For a baserel, this struct is created by @@ -129,7 +138,8 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 83971665e3..4ac211f250 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1780,31 +1780,31 @@ INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; @@ -1840,12 +1840,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 17a0df6978..7548deae4d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1524,6 +1524,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
+