diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index 08daf26fdf..413d603a03 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -59,6 +59,7 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -105,7 +106,7 @@ static bool UserMappingPasswordRequired(UserMapping *user); * (not even on error), we need this flag to cue manual cleanup. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; ConnCacheEntry *entry; @@ -197,6 +198,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); @@ -213,6 +215,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 90db550b92..540ed3e711 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -6973,7 +6973,7 @@ INSERT INTO a(aa) VALUES('aaaaa'); INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+------- a | aaa @@ -7001,7 +7001,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7029,7 +7029,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7057,7 +7057,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | newtoo @@ -7085,7 +7085,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+---- (0 rows) @@ -7127,23 +7127,28 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 @@ -7153,9 +7158,9 @@ select * from bar where f1 in (select f1 from foo) for update; -> Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -7165,23 +7170,28 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 @@ -7191,9 +7201,9 @@ select * from bar where f1 in (select f1 from foo) for share; -> Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index a31abce7c9..d5f5a7d4e8 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execReady.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -159,6 +160,9 @@ typedef struct PgFdwScanState MemoryContext temp_cxt; /* context for per-tuple temporary data */ int fetch_size; /* number of tuples per fetch */ + + /* per-connection state */ + PgFdwConnState *conn_state; } PgFdwScanState; /* @@ -392,6 +396,8 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *output_rel, void *extra); +static int postgresReady(ForeignScanState *node); + /* * Helper functions */ @@ -419,6 +425,7 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); static void create_cursor(ForeignScanState *node); +static void fetch_more_data_begin(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, @@ -558,6 +565,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support for asynchrony */ + routine->Ready = postgresReady; + PG_RETURN_POINTER(routine); } @@ -1433,7 +1443,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1484,6 +1494,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + fsstate->conn_state->async_query_sent = false; } /* @@ -1596,6 +1607,13 @@ postgresEndForeignScan(ForeignScanState *node) if (fsstate == NULL) return; + /* + * If we're ending before we've collected a response from an asynchronous + * query, we have to consume the response. + */ + if (fsstate->conn_state->async_query_sent) + fetch_more_data(node); + /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); @@ -2371,7 +2389,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, NULL); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2745,7 +2763,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3382,6 +3400,35 @@ create_cursor(ForeignScanState *node) pfree(buf.data); } +/* + * Begin an asynchronous data fetch. + * fetch_more_data must be called to fetch the results.. + */ +static void +fetch_more_data_begin(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PGconn *conn = fsstate->conn; + char sql[64]; + + Assert(!fsstate->conn_state->async_query_sent); + + /* + * Create the cursor synchronously. (With more state machine stuff we + * could do this asynchronously too). + */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query); + fsstate->conn_state->async_query_sent = true; +} + /* * Fetch some more rows from the node's cursor. */ @@ -3408,13 +3455,28 @@ fetch_more_data(ForeignScanState *node) int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (!fsstate->conn_state->async_query_sent) + { + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + res = pgfdw_exec_query(conn, sql); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } + else + { + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = PQgetResult(conn); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3441,6 +3503,15 @@ fetch_more_data(ForeignScanState *node) /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); + + /* If this was the second part of an async request, we must fetch until NULL. */ + if (fsstate->conn_state->async_query_sent) + { + /* call once and raise error if not NULL as expected? */ + while (PQgetResult(conn) != NULL) + ; + fsstate->conn_state->async_query_sent = false; + } } PG_FINALLY(); { @@ -3565,7 +3636,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, NULL); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4440,7 +4511,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4526,7 +4597,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4754,7 +4825,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -5559,6 +5630,41 @@ postgresGetForeignJoinPaths(PlannerInfo *root, /* XXX Consider parameterized paths for the join relation */ } +static int +postgresReady(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + if (fsstate->conn_state->async_query_sent) + { + /* + * We have already started a query, for some other executor node. We + * currently can't handle two at the same time (we'd have to create + * more connections for that). + */ + return EXEC_READY_BUSY; + } + else if (fsstate->next_tuple < fsstate->num_tuples) + { + /* We already have buffered tuples. */ + return EXEC_READY_MORE; + } + else if (fsstate->eof_reached) + { + /* We have already hit the end of the scan. */ + return EXEC_READY_EOF; + } + else + { + /* + * We will start a query now, and tell the caller to wait until the + * file descriptor says we're ready and then call ExecProcNode. + */ + fetch_more_data_begin(node); + return PQsocket(fsstate->conn); + } +} + /* * Assess whether the aggregation, grouping and having operations can be pushed * down to the foreign server. As a side effect, save information we obtain in diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index eef410db39..a3c11e8d96 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -19,6 +19,15 @@ #include "nodes/pathnodes.h" #include "utils/relcache.h" +/* + * Extra control information relating to a connection. + */ +typedef struct PgFdwConnState +{ + /* Has an asynchronous query been sent? */ + bool async_query_sent; +} PgFdwConnState; + /* * FDW-specific planner information kept in RelOptInfo.fdw_private for a * postgres_fdw foreign table. For a baserel, this struct is created by @@ -129,7 +138,8 @@ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 83971665e3..4ac211f250 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1780,31 +1780,31 @@ INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; @@ -1840,12 +1840,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 17a0df6978..7548deae4d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1524,6 +1524,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + AppendReady + Waiting for a subplan of Append to be ready. + BackupWaitWalArchive Waiting for WAL files required for a backup to be successfully diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index 01b7b926bf..25cf988606 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -73,6 +73,7 @@ #include "postgres.h" #include "executor/executor.h" +#include "executor/execReady.h" #include "executor/nodeAgg.h" #include "executor/nodeAppend.h" #include "executor/nodeBitmapAnd.h" @@ -741,6 +742,30 @@ ExecEndNode(PlanState *node) } } +/* + * ExecReady + * + * Check whether the node would be able to produce a new tuple without + * blocking. EXEC_READY_MORE means a tuple can be returned by ExecProcNode + * immediately without waiting. EXEC_READY_EOF means there are no further + * tuples to consume. EXEC_READY_UNSUPPORTED means that this node doesn't + * support asynchronous interaction. EXEC_READY_BUSY means that this node + * currently can't provide asynchronous service. Any other value is a file + * descriptor which can be used to wait until the node is ready to produce a + * tuple. + */ +int +ExecReady(PlanState *node) +{ + switch (nodeTag(node)) + { + case T_ForeignScanState: + return ExecForeignScanReady((ForeignScanState *) node); + default: + return EXEC_READY_UNSUPPORTED; + } +} + /* * ExecShutdownNode * diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 88919e62fa..1c0935a079 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -59,8 +59,11 @@ #include "executor/execdebug.h" #include "executor/execPartition.h" +#include "executor/execReady.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" /* Shared state for parallel-aware Append. */ struct ParallelAppendState @@ -219,9 +222,207 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* For parallel query, this will be overridden later. */ appendstate->choose_next_subplan = choose_next_subplan_locally; + /* + * Initially we consider all subplans to be potentially asynchronous. + */ + appendstate->asyncplans = (PlanState **) palloc(nplans * sizeof(PlanState *)); + appendstate->asyncfds = (int *) palloc0(nplans * sizeof(int)); + appendstate->nasyncplans = nplans; + memcpy(appendstate->asyncplans, appendstate->appendplans, nplans * sizeof(PlanState *)); + appendstate->lastreadyplan = 0; + return appendstate; } +/* + * Forget about an asynchronous subplan, given an async subplan index. Return + * the index of the next subplan. + */ +static int +forget_async_subplan(AppendState *node, int i) +{ + int last = node->nasyncplans - 1; + + if (i == last) + { + /* This was the last subplan, forget it and move to first. */ + i = 0; + if (node->lastreadyplan == last) + node->lastreadyplan = 0; + } + else + { + /* + * Move the last one here (cheaper than memmov'ing the whole array + * down and we don't care about the order). + */ + node->asyncplans[i] = node->asyncplans[last]; + node->asyncfds[i] = node->asyncfds[last]; + } + --node->nasyncplans; + + return i; +} + +/* + * Wait for the first asynchronous subplan's file descriptor to be ready to + * read or error, and then ask it for a tuple. + * + * This is called by append_next_async when every async subplan has provided a + * file descriptor to wait on, so we must begin waiting. + */ +static TupleTableSlot * +append_next_async_wait(AppendState *node) +{ + while (node->nasyncplans > 0) + { + WaitEventSet *set; + WaitEvent event; + int i; + + /* + * For now there is no facility to remove fds from WaitEventSets when + * they are no longer interesting, so we allocate, populate, free + * every time, a la select(). If we had RemoveWaitEventFromSet, we + * could use the same WaitEventSet object for the life of the append + * node, and add/remove as we go, a la epoll/kqueue. + * + * Note: We could make a single call to WaitEventSetWait and have a + * big enough output event buffer to learn about readiness on all + * interesting sockets and loop over those, but one implementation can + * only tell us about a single socket at a time, so we need to be + * prepared to call WaitEventSetWait repeatedly. + */ + set = CreateWaitEventSet(CurrentMemoryContext, node->nasyncplans + 1); + AddWaitEventToSet(set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, NULL, + NULL); + for (i = 0; i < node->nasyncplans; ++i) + { + Assert(node->asyncfds[i] > 0); + AddWaitEventToSet(set, WL_SOCKET_READABLE, node->asyncfds[i], NULL, + NULL); + } + i = WaitEventSetWait(set, -1, &event, 1, WAIT_EVENT_APPEND_READY); + Assert(i > 0); + FreeWaitEventSet(set); + + if (event.events & WL_SOCKET_READABLE) + { + /* Linear search for the node that told us to wait for this fd. */ + for (i = 0; i < node->nasyncplans; ++i) + { + if (event.fd == node->asyncfds[i]) + { + TupleTableSlot *result; + + /* + * We assume that because the fd is ready, it can produce + * a tuple now, which is not perfect. An improvement + * would be if it could say 'not yet, I'm still not + * ready', so eg postgres_fdw could PQconsumeInput and + * then say 'I need more input'. + */ + result = ExecProcNode(node->asyncplans[i]); + if (!TupIsNull(result)) + { + /* + * Remember this plan so that append_next_async will + * keep trying this subplan first until it stops + * feeding us buffered tuples. + */ + node->lastreadyplan = i; + /* We can stop waiting for this fd. */ + node->asyncfds[i] = 0; + return result; + } + else + { + /* + * This subplan has reached EOF. We'll go back and + * wait for another one. + */ + forget_async_subplan(node, i); + break; + } + } + } + } + } + /* + * We visited every ready subplan, tried to pull a tuple, and they all + * reported EOF. There is no more async data available. + */ + return NULL; +} + +/* + * Fetch the next tuple available from any asynchronous subplan. If none can + * provide a tuple immediately, wait for the first one that is ready to + * provide a tuple. Return NULL when there are no more tuples available. + */ +static TupleTableSlot * +append_next_async(AppendState *node) +{ + int count; + int i; + + /* + * We'll start our scan of subplans at the last one that was able to give + * us a tuple, if there was one. It may be able to give us a new tuple + * straight away so we can leave early. + */ + i = node->lastreadyplan; + + /* Loop until we've visited each potentially async subplan. */ + for (count = node->nasyncplans; count > 0; --count) + { + /* + * If we don't already have a file descriptor to wait on for this + * subplan, see if it is ready. + */ + if (node->asyncfds[i] == 0) + { + int ready = ExecReady(node->asyncplans[i]); + + switch (ready) + { + case EXEC_READY_MORE: + /* The node has a buffered tuple for us. */ + return ExecProcNode(node->asyncplans[i]); + + case EXEC_READY_UNSUPPORTED: + case EXEC_READY_EOF: + case EXEC_READY_BUSY: + /* This subplan can't give us anything asynchronously. */ + i = forget_async_subplan(node, i); + continue; + + default: + /* We have a new file descriptor to wait for. */ + Assert(ready > 0); + node->asyncfds[i] = ready; + node->lastreadyplan = 0; + break; + } + } + + /* Move on to the next plan (circular). */ + i = (i + 1) % node->nasyncplans; + } + + /* We might have removed all subplans; if so we can leave now. */ + if (node->nasyncplans == 0) + return NULL; + + /* + * If we reached here, then all remaining async subplans have given us a + * file descriptor to wait for. So do that, and pull a tuple as soon as + * one is ready. + */ + return append_next_async_wait(node); +} + + /* ---------------------------------------------------------------- * ExecAppend * @@ -233,6 +434,16 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + /* First, drain all asynchronous subplans as they become ready. */ + if (node->nasyncplans > 0) + { + TupleTableSlot *result = append_next_async(node); + + if (!TupIsNull(result)) + return result; + } + Assert(node->nasyncplans == 0); + if (node->as_whichplan < 0) { /* Nothing to do if there are no subplans */ @@ -395,6 +606,9 @@ ExecAppendInitializeDSM(AppendState *node, node->as_pstate = pstate; node->choose_next_subplan = choose_next_subplan_for_leader; + + /* TODO: for now disable async when running in parallel */ + node->nasyncplans = 0; } /* ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 513471ab9b..fe32cbc123 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -23,6 +23,7 @@ #include "postgres.h" #include "executor/executor.h" +#include "executor/execReady.h" #include "executor/nodeForeignscan.h" #include "foreign/fdwapi.h" #include "utils/memutils.h" @@ -384,3 +385,21 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecForeignScanReady + * + * Checks if the foreign scan can emit data asynchronously + * using socket readiness as an indicator. + * ---------------------------------------------------------------- + */ +int +ExecForeignScanReady(ForeignScanState *node) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + + if (fdwroutine->Ready) + return fdwroutine->Ready(node); + else + return EXEC_READY_UNSUPPORTED; +} diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 8116b23614..88f0d376c3 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3752,6 +3752,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) switch (w) { + case WAIT_EVENT_APPEND_READY: + event_name = "AppendReady"; + break; case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE: event_name = "BackupWaitWalArchive"; break; diff --git a/src/include/executor/execReady.h b/src/include/executor/execReady.h index e69de29bb2..01410ea7bc 100644 --- a/src/include/executor/execReady.h +++ b/src/include/executor/execReady.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * + * execReady.h + * Values used by FDW and the executor for async tuple iteration. + * + * Portions Copyright (c) 2019, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/executor/execReady.h + * + *------------------------------------------------------------------------- + */ +#ifndef EXECREADY_H +#define EXECREADY_H + +/* + * Asynchronous processing is not currently available (because an asynchronous + * request is already in progress). + */ +#define EXEC_READY_BUSY -3 + +/* There are no more tuples. */ +#define EXEC_READY_EOF -2 + +/* This FDW or executor node does not support asynchronous processing. */ +#define EXEC_READY_UNSUPPORTED -1 + +/* More tuples are available immediately without waiting. */ +#define EXEC_READY_MORE 0 + +#endif diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 415e117407..243254e61c 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -224,6 +224,7 @@ extern void EvalPlanQualEnd(EPQState *epqstate); extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags); extern void ExecSetExecProcNode(PlanState *node, ExecProcNodeMtd function); extern Node *MultiExecProcNode(PlanState *node); +extern int ExecReady(PlanState *node); extern void ExecEndNode(PlanState *node); extern bool ExecShutdownNode(PlanState *node); extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node); diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 326d713ebf..151490572d 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -30,5 +30,6 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node, extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern int ExecForeignScanReady(ForeignScanState *node); #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 95556dfb15..253288e3c0 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -170,6 +170,8 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef int (*Ready_function) (ForeignScanState *node); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -246,6 +248,9 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for asynchronous processing */ + Ready_function Ready; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 0b42dd6f94..70d112cced 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1231,6 +1231,11 @@ struct AppendState struct PartitionPruneState *as_prune_state; Bitmapset *as_valid_subplans; bool (*choose_next_subplan) (AppendState *); + + PlanState **asyncplans; + int *asyncfds; + int nasyncplans; + int lastreadyplan; }; /* ---------------- diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 807a9c1edf..a560797b82 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -846,6 +846,7 @@ typedef enum */ typedef enum { + WAIT_EVENT_APPEND_READY, WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP,