diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c index ee0b4acf0b..3ecb8e1e4f 100644 --- a/contrib/postgres_fdw/connection.c +++ b/contrib/postgres_fdw/connection.c @@ -62,6 +62,7 @@ typedef struct ConnCacheEntry Oid serverid; /* foreign server OID used to get server name */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + PgFdwConnState state; /* extra per-connection state */ } ConnCacheEntry; /* @@ -117,7 +118,7 @@ static bool disconnect_cached_connections(Oid serverid); * (not even on error), we need this flag to cue manual cleanup. */ PGconn * -GetConnection(UserMapping *user, bool will_prep_stmt) +GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state) { bool found; bool retry = false; @@ -264,6 +265,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt) /* Remember if caller will prepare statements */ entry->have_prep_stmt |= will_prep_stmt; + /* If caller needs access to the per-connection state, return it. */ + if (state) + *state = &entry->state; + return entry->conn; } @@ -291,6 +296,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user) entry->mapping_hashvalue = GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); + memset(&entry->state, 0, sizeof(entry->state)); /* Now try to make the connection */ entry->conn = connect_pg_server(server, user); diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 60c7e115d6..05428ee018 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -7021,7 +7021,7 @@ INSERT INTO a(aa) VALUES('aaaaa'); INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+------- a | aaa @@ -7049,7 +7049,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7077,7 +7077,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | aaa @@ -7105,7 +7105,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+-------- a | newtoo @@ -7133,7 +7133,7 @@ SELECT tableoid::regclass, * FROM ONLY a; (3 rows) DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; tableoid | aa ----------+---- (0 rows) @@ -7175,23 +7175,28 @@ insert into bar2 values(3,33,33); insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for update; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 @@ -7201,9 +7206,9 @@ select * from bar where f1 in (select f1 from foo) for update; -> Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; f1 | f2 ----+---- 1 | 11 @@ -7213,23 +7218,28 @@ select * from bar where f1 in (select f1 from foo) for update; (4 rows) explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; - QUERY PLAN ----------------------------------------------------------------------------------------------- +select * from bar where f1 in (select f1 from foo) order by 1 for share; + QUERY PLAN +---------------------------------------------------------------------------------------------------------------- LockRows Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid - -> Hash Join + -> Merge Join Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid Inner Unique: true - Hash Cond: (bar.f1 = foo.f1) - -> Append - -> Seq Scan on public.bar bar_1 + Merge Cond: (bar.f1 = foo.f1) + -> Merge Append + Sort Key: bar.f1 + -> Sort Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid + Sort Key: bar_1.f1 + -> Seq Scan on public.bar bar_1 + Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid -> Foreign Scan on public.bar2 bar_2 Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid - Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE - -> Hash + Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE + -> Sort Output: foo.ctid, foo.f1, foo.*, foo.tableoid + Sort Key: foo.f1 -> HashAggregate Output: foo.ctid, foo.f1, foo.*, foo.tableoid Group Key: foo.f1 @@ -7239,9 +7249,9 @@ select * from bar where f1 in (select f1 from foo) for share; -> Foreign Scan on public.foo2 foo_2 Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1 -(23 rows) +(28 rows) -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; f1 | f2 ----+---- 1 | 11 diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 368997d9d1..11b19ae1ef 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -21,6 +21,7 @@ #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" +#include "executor/execAsync.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" @@ -37,9 +38,11 @@ #include "optimizer/tlist.h" #include "parser/parsetree.h" #include "postgres_fdw.h" +#include "storage/latch.h" #include "utils/builtins.h" #include "utils/float.h" #include "utils/guc.h" +#include "utils/hsearch.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" @@ -159,6 +162,11 @@ typedef struct PgFdwScanState int fetch_ct_2; /* Min(# of fetches done, 2) */ bool eof_reached; /* true if last fetch reached EOF */ + /* for asynchronous execution */ + Oid umid; /* Oid of user mapping */ + PgFdwConnState *conn_state; /* extra per-connection state */ + ForeignScanState *next_node; /* next ForeignScan node to activate */ + /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -408,6 +416,12 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *output_rel, void *extra); +static bool postgresIsForeignPathAsyncCapable(ForeignPath *path); +static bool postgresReconsiderAsyncForeignScan(ForeignScanState *node, + AsyncContext *acxt); +static void postgresForeignAsyncRequest(AsyncRequest *areq); +static void postgresForeignAsyncConfigureWait(AsyncRequest *areq); +static void postgresForeignAsyncNotify(AsyncRequest *areq); /* * Helper functions @@ -435,7 +449,11 @@ static void adjust_foreign_grouping_path_cost(PlannerInfo *root, static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec, EquivalenceMember *em, void *arg); +static UserMapping *get_user_mapping(EState *estate, ForeignScan *fsplan); +static void record_foreign_scan_info(EState *estate, ForeignScanState *node, + UserMapping *user); static void create_cursor(ForeignScanState *node); +static void fetch_more_data_begin(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); static PgFdwModifyState *create_foreign_modify(EState *estate, @@ -491,6 +509,7 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, double *totaldeadrows); static void analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate); +static void request_tuple_asynchronously(AsyncRequest *areq); static HeapTuple make_tuple_from_result_row(PGresult *res, int row, Relation rel, @@ -583,6 +602,13 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for upper relation push-down */ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + /* Support functions for asynchronous execution */ + routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable; + routine->ReconsiderAsyncForeignScan = postgresReconsiderAsyncForeignScan; + routine->ForeignAsyncRequest = postgresForeignAsyncRequest; + routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait; + routine->ForeignAsyncNotify = postgresForeignAsyncNotify; + PG_RETURN_POINTER(routine); } @@ -1417,19 +1443,40 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; + bool asyncPlan = estate->es_plannedstmt->asyncPlan; PgFdwScanState *fsstate; - RangeTblEntry *rte; - Oid userid; - ForeignTable *table; UserMapping *user; - int rtindex; int numParams; /* - * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + * No need to work hard in EXPLAIN (no ANALYZE) case. In that case, + * node->fdw_state stays NULL; or node->fdw_state->conn stays NULL. */ if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + { + /* Do nothing if the query plan tree has no async-aware Appends. */ + if (!asyncPlan) + return; + + /* Get info about user mapping. */ + user = get_user_mapping(estate, fsplan); + + /* Record the information on the ForeignScan node in the EState. */ + record_foreign_scan_info(estate, node, user); + + /* + * If the ForeignScan node is async-capable, save the user mapping + * OID in node->fdw_state for use later. + */ + if (node->ss.ps.async_capable) + { + fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); + node->fdw_state = (void *) fsstate; + fsstate->umid = user->umid; + } + return; + } /* * We'll save private state in node->fdw_state. @@ -1437,28 +1484,27 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); node->fdw_state = (void *) fsstate; - /* - * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. In case of a join or aggregate, use the - * lowest-numbered member RTE as a representative; we would get the same - * result from any. - */ - if (fsplan->scan.scanrelid > 0) - rtindex = fsplan->scan.scanrelid; - else - rtindex = bms_next_member(fsplan->fs_relids, -1); - rte = exec_rt_fetch(rtindex, estate); - userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + /* Get info about user mapping. */ + user = get_user_mapping(estate, fsplan); - /* Get info about foreign table. */ - table = GetForeignTable(rte->relid); - user = GetUserMapping(userid, table->serverid); + if (asyncPlan) + { + /* Record the information on the ForeignScan node in the EState. */ + record_foreign_scan_info(estate, node, user); + + /* + * If the ForeignScan node is async-capable, save the user mapping + * OID in node->fdw_state for use later. + */ + if (node->ss.ps.async_capable) + fsstate->umid = user->umid; + } /* * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - fsstate->conn = GetConnection(user, false); + fsstate->conn = GetConnection(user, false, &fsstate->conn_state); /* Assign a unique ID for my cursor */ fsstate->cursor_number = GetCursorNumber(fsstate->conn); @@ -1509,6 +1555,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) &fsstate->param_flinfo, &fsstate->param_exprs, &fsstate->param_values); + + /* Initialize async state */ + fsstate->conn_state->activated = NULL; + fsstate->conn_state->async_query_sent = false; + fsstate->next_node = NULL; } /* @@ -1523,8 +1574,10 @@ postgresIterateForeignScan(ForeignScanState *node) TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* - * If this is the first call after Begin or ReScan, we need to create the - * cursor on the remote side. + * In sync mode, if this is the first call after Begin or ReScan, we need + * to create the cursor on the remote side. In async mode, we would have + * aready created the cursor before we get here, even if this is the first + * call after Begin or ReScan. */ if (!fsstate->cursor_exists) create_cursor(node); @@ -1534,6 +1587,9 @@ postgresIterateForeignScan(ForeignScanState *node) */ if (fsstate->next_tuple >= fsstate->num_tuples) { + /* In async mode, just clear tuple slot. */ + if (node->ss.ps.async_capable) + return ExecClearTuple(slot); /* No point in another fetch if we already detected EOF, though. */ if (!fsstate->eof_reached) fetch_more_data(node); @@ -1563,6 +1619,14 @@ postgresReScanForeignScan(ForeignScanState *node) char sql[64]; PGresult *res; + /* Reset async state */ + if (node->ss.ps.async_capable) + { + fsstate->conn_state->activated = NULL; + fsstate->conn_state->async_query_sent = false; + fsstate->next_node = NULL; + } + /* If we haven't created the cursor yet, nothing to do. */ if (!fsstate->cursor_exists) return; @@ -1617,10 +1681,21 @@ postgresEndForeignScan(ForeignScanState *node) { PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ - if (fsstate == NULL) + /* + * if fsstate is NULL or if fsstate->conn is NULL, we are in EXPLAIN; + * nothing to do + */ + if (fsstate == NULL || fsstate->conn == NULL) return; + /* + * If we're ending before we've collected a response from an asynchronous + * query, we have to consume the response. + */ + if (fsstate->conn_state->activated == node && + fsstate->conn_state->async_query_sent) + fetch_more_data(node); + /* Close the cursor if open, to prevent accumulation of cursors */ if (fsstate->cursor_exists) close_cursor(fsstate->conn, fsstate->cursor_number); @@ -2491,7 +2566,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - dmstate->conn = GetConnection(user, false); + dmstate->conn = GetConnection(user, false, NULL); /* Update the foreign-join-related fields. */ if (fsplan->scan.scanrelid == 0) @@ -2872,7 +2947,7 @@ estimate_path_cost_size(PlannerInfo *root, false, &retrieved_attrs, NULL); /* Get the remote estimate */ - conn = GetConnection(fpinfo->user, false); + conn = GetConnection(fpinfo->user, false, NULL); get_remote_estimate(sql.data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -3428,6 +3503,53 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, return true; } +static UserMapping * +get_user_mapping(EState *estate, ForeignScan *fsplan) +{ + int rtindex; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. In case of a join or aggregate, use the + * lowest-numbered member RTE as a representative; we would get the same + * result from any. + */ + if (fsplan->scan.scanrelid > 0) + rtindex = fsplan->scan.scanrelid; + else + rtindex = bms_next_member(fsplan->fs_relids, -1); + rte = exec_rt_fetch(rtindex, estate); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(rte->relid); + + return GetUserMapping(userid, table->serverid); +} + +static void +record_foreign_scan_info(EState *estate, ForeignScanState *node, + UserMapping *user) +{ + HTAB *htab = estate->es_foreign_scan_hash; + int fsplanid = node->ss.ps.plan->plan_node_id; + bool found; + ForeignScanHashEntry *entry; + + /* Find or create hash table entry for the user mapping. */ + Assert(htab); + entry = (ForeignScanHashEntry *) hash_search(htab, &user->umid, + HASH_ENTER, &found); + + if (!found) + entry->fsplanids = bms_make_singleton(fsplanid); + else + entry->fsplanids = bms_add_member(entry->fsplanids, fsplanid); +} + /* * Create cursor for node's query with current parameter values. */ @@ -3500,6 +3622,34 @@ create_cursor(ForeignScanState *node) pfree(buf.data); } +/* + * Begin an asynchronous data fetch. + * fetch_more_data must be called to fetch the results.. + */ +static void +fetch_more_data_begin(ForeignScanState *node) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + PGconn *conn = fsstate->conn; + char sql[64]; + + Assert(fsstate->conn_state->activated == node); + Assert(!fsstate->conn_state->async_query_sent); + + /* Create the cursor synchronously. */ + if (!fsstate->cursor_exists) + create_cursor(node); + + /* We will send this query, but not wait for the response. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + if (PQsendQuery(conn, sql) < 0) + pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query); + + fsstate->conn_state->async_query_sent = true; +} + /* * Fetch some more rows from the node's cursor. */ @@ -3522,17 +3672,36 @@ fetch_more_data(ForeignScanState *node) PG_TRY(); { PGconn *conn = fsstate->conn; - char sql[64]; int numrows; int i; - snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fsstate->fetch_size, fsstate->cursor_number); + if (node->ss.ps.async_capable) + { + Assert(fsstate->conn_state->activated == node); + Assert(fsstate->conn_state->async_query_sent); + + /* + * The query was already sent by an earlier call to + * fetch_more_data_begin. So now we just fetch the result. + */ + res = PQgetResult(conn); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } + else + { + char sql[64]; - res = pgfdw_exec_query(conn, sql); - /* On error, report the original query, not the FETCH. */ - if (PQresultStatus(res) != PGRES_TUPLES_OK) - pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + /* This is a regular synchronous fetch. */ + snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", + fsstate->fetch_size, fsstate->cursor_number); + + res = pgfdw_exec_query(conn, sql); + /* On error, report the original query, not the FETCH. */ + if (PQresultStatus(res) != PGRES_TUPLES_OK) + pgfdw_report_error(ERROR, res, conn, false, fsstate->query); + } /* Convert the data into HeapTuples */ numrows = PQntuples(res); @@ -3559,6 +3728,15 @@ fetch_more_data(ForeignScanState *node) /* Must be EOF if we didn't get as many tuples as we asked for. */ fsstate->eof_reached = (numrows < fsstate->fetch_size); + + /* If this was the second part of an async request, we must fetch until NULL. */ + if (node->ss.ps.async_capable) + { + /* call once and raise error if not NULL as expected? */ + while (PQgetResult(conn) != NULL) + ; + fsstate->conn_state->async_query_sent = false; + } } PG_FINALLY(); { @@ -3684,7 +3862,7 @@ create_foreign_modify(EState *estate, user = GetUserMapping(userid, table->serverid); /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); + fmstate->conn = GetConnection(user, true, NULL); fmstate->p_name = NULL; /* prepared statement not made yet */ /* Set up remote query information. */ @@ -4618,7 +4796,7 @@ postgresAnalyzeForeignTable(Relation relation, */ table = GetForeignTable(RelationGetRelid(relation)); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct command to get page count for relation. @@ -4704,7 +4882,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, table->serverid); - conn = GetConnection(user, false); + conn = GetConnection(user, false, NULL); /* * Construct cursor that retrieves whole rows from remote. @@ -4932,7 +5110,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) */ server = GetForeignServer(serverOid); mapping = GetUserMapping(GetUserId(), server->serverid); - conn = GetConnection(mapping, false); + conn = GetConnection(mapping, false, NULL); /* Don't attempt to import collation if remote server hasn't got it */ if (PQserverVersion(conn) < 90100) @@ -6479,6 +6657,221 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel, add_path(final_rel, (Path *) final_path); } +/* + * postgresIsForeignPathAsyncCapable + * Check whether a given ForeignPath node is async-capable. + */ +static bool +postgresIsForeignPathAsyncCapable(ForeignPath *path) +{ + return true; +} + +/* + * postgresReconsiderAsyncForeignScan + * Re-examine a given ForeignScan node that was planned as async-capable. + */ +static bool +postgresReconsiderAsyncForeignScan(ForeignScanState *node, AsyncContext *acxt) +{ + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + EState *estate = node->ss.ps.state; + HTAB *htab = estate->es_foreign_scan_hash; + bool found; + ForeignScanHashEntry *entry; + AppendState *requestor = (AppendState *) acxt->requestor; + Bitmapset *asyncplanids = requestor->as_asyncplanids; + Bitmapset *fsplanids; + + /* Find hash table entry for the ForeignScan node. */ + Assert(htab); + entry = (ForeignScanHashEntry *) hash_search(htab, &fsstate->umid, + HASH_FIND, &found); + Assert(found); + + fsplanids = entry->fsplanids; + Assert(bms_is_member(node->ss.ps.plan->plan_node_id, fsplanids)); + + /* + * If the connection used for the ForeignScan node is used in other parts + * of the query plan tree except async subplans of the parent Append node, + * disable async execution of the ForeignScan node. + */ + if (!bms_is_subset(fsplanids, asyncplanids)) + return false; + + /* + * If the subplans of the Append node are all async-capable, and use the + * same connection, then we won't execute them asynchronously. + */ + if (requestor->as_nasyncplans == requestor->as_nplans && + !bms_nonempty_difference(asyncplanids, fsplanids)) + return false; + + return true; +} + +/* + * postgresForeignAsyncRequest + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +postgresForeignAsyncRequest(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + + /* + * If this is the first call after Begin or ReScan, mark the connection + * as used by the ForeignScan node. + */ + if (fsstate->conn_state->activated == NULL) + fsstate->conn_state->activated = node; + + /* + * If the connection has already been used by a ForeignScan node, put it + * at the end of the chain of waiting ForeignScan nodes, and then return. + */ + if (node != fsstate->conn_state->activated) + { + ForeignScanState *curr_node = fsstate->conn_state->activated; + PgFdwScanState *curr_fsstate = (PgFdwScanState *) curr_node->fdw_state; + + /* Scan down the chain ... */ + while (curr_fsstate->next_node) + { + curr_node = curr_fsstate->next_node; + Assert(node != curr_node); + curr_fsstate = (PgFdwScanState *) curr_node->fdw_state; + } + /* Update the chain linking */ + curr_fsstate->next_node = node; + /* Mark the request as needing a callback */ + areq->callback_pending = true; + areq->request_complete = false; + return; + } + + request_tuple_asynchronously(areq); +} + +/* + * postgresForeignAsyncConfigureWait + * Configure a file descriptor event for which we wish to wait. + */ +static void +postgresForeignAsyncConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + AppendState *requestor = (AppendState *) areq->requestor; + WaitEventSet *set = requestor->as_eventset; + + /* This function should not be called unless callback_pending */ + Assert(areq->callback_pending); + + /* If the ForeignScan node isn't activated yet, nothing to do */ + if (fsstate->conn_state->activated != node) + return; + + AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn), + NULL, areq); +} + +/* + * postgresForeignAsyncNotify + * Fetch some more tuples from a file descriptor that becomes ready, + * requesting next tuple. + */ +static void +postgresForeignAsyncNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + + /* The core code would have initialized the callback_pending flag */ + Assert(!areq->callback_pending); + + fetch_more_data(node); + + request_tuple_asynchronously(areq); +} + +/* + * Asynchronously request next tuple from a foreign PostgreSQL table. + */ +static void +request_tuple_asynchronously(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + TupleTableSlot *result; + + /* Request some more tuples, if we've run out */ + if (fsstate->next_tuple >= fsstate->num_tuples) + { + /* No point in another fetch if we already detected EOF, though */ + if (!fsstate->eof_reached) + { + /* Begin another fetch */ + fetch_more_data_begin(node); + /* Mark the request as needing a callback */ + areq->callback_pending = true; + areq->request_complete = false; + return; + } + fsstate->conn_state->activated = NULL; + + /* Activate the next ForeignScan node if any */ + if (fsstate->next_node) + { + /* Mark the connection as used by the next ForeignScan node */ + fsstate->conn_state->activated = fsstate->next_node; + Assert(!fsstate->conn_state->async_query_sent); + /* Begin an asynchronous fetch for that node */ + fetch_more_data_begin(fsstate->next_node); + } + + /* There's nothing more to do; just return a NULL pointer */ + result = NULL; + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); + return; + } + + /* Get a tuple from the ForeignScan node */ + result = ExecProcNode((PlanState *) node); + + if (TupIsNull(result)) + { + Assert(fsstate->next_tuple >= fsstate->num_tuples); + + /* Request some more tuples, if we've not detected EOF yet */ + if (!fsstate->eof_reached) + { + /* Begin another fetch */ + fetch_more_data_begin(node); + /* Mark the request as needing a callback */ + areq->callback_pending = true; + areq->request_complete = false; + return; + } + fsstate->conn_state->activated = NULL; + + /* Activate the next ForeignScan node if any */ + if (fsstate->next_node) + { + /* Mark the connection as used by the next ForeignScan node */ + fsstate->conn_state->activated = fsstate->next_node; + Assert(!fsstate->conn_state->async_query_sent); + /* Begin an asynchronous fetch for that node */ + fetch_more_data_begin(fsstate->next_node); + } + } + + /* Mark the request as complete */ + ExecAsyncRequestDone(areq, result); +} + /* * Create a tuple from the specified row of the PGresult. * diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h index 1f67b4d9fd..c3537b6449 100644 --- a/contrib/postgres_fdw/postgres_fdw.h +++ b/contrib/postgres_fdw/postgres_fdw.h @@ -16,6 +16,7 @@ #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "libpq-fe.h" +#include "nodes/execnodes.h" #include "nodes/pathnodes.h" #include "utils/relcache.h" @@ -124,12 +125,22 @@ typedef struct PgFdwRelationInfo int relation_index; } PgFdwRelationInfo; +/* + * Extra control information relating to a connection. + */ +typedef struct PgFdwConnState +{ + ForeignScanState *activated; /* currently-activated ForeignScan node */ + bool async_query_sent; /* has an asynchronous query been sent? */ +} PgFdwConnState; + /* in postgres_fdw.c */ extern int set_transmission_modes(void); extern void reset_transmission_modes(int nestlevel); /* in connection.c */ -extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt); +extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt, + PgFdwConnState **state); extern void ReleaseConnection(PGconn *conn); extern unsigned int GetCursorNumber(PGconn *conn); extern unsigned int GetPrepStmtNumber(PGconn *conn); diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 151f4f1834..ceda16b92f 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -1822,31 +1822,31 @@ INSERT INTO b(aa) VALUES('bbb'); INSERT INTO b(aa) VALUES('bbbb'); INSERT INTO b(aa) VALUES('bbbbb'); -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE b SET aa = 'new'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; UPDATE a SET aa = 'newtoo'; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; DELETE FROM a; -SELECT tableoid::regclass, * FROM a; +SELECT tableoid::regclass, * FROM a ORDER BY 1, 2; SELECT tableoid::regclass, * FROM b; SELECT tableoid::regclass, * FROM ONLY a; @@ -1882,12 +1882,12 @@ insert into bar2 values(4,44,44); insert into bar2 values(7,77,77); explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for update; -select * from bar where f1 in (select f1 from foo) for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; +select * from bar where f1 in (select f1 from foo) order by 1 for update; explain (verbose, costs off) -select * from bar where f1 in (select f1 from foo) for share; -select * from bar where f1 in (select f1 from foo) for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; +select * from bar where f1 in (select f1 from foo) order by 1 for share; -- Check UPDATE with inherited target and an inherited source table explain (verbose, costs off) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5ef1c7ad3c..4a9eece710 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4735,6 +4735,20 @@ ANY num_sync ( + enable_async_append (boolean) + + enable_async_append configuration parameter + + + + + Enables or disables the query planner's use of async-aware + append plan types. The default is on. + + + + enable_bitmapscan (boolean) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index c602ee4427..a2d2f42e28 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1563,6 +1563,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + AppendReady + Waiting for a subplan of Append to be ready. + BackupWaitWalArchive Waiting for WAL files required for a backup to be successfully diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index f80e379973..a2b7b8bd67 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1390,6 +1390,8 @@ ExplainNode(PlanState *planstate, List *ancestors, } if (plan->parallel_aware) appendStringInfoString(es->str, "Parallel "); + if (planstate->async_capable) + appendStringInfoString(es->str, "Async "); appendStringInfoString(es->str, pname); es->indent++; } @@ -1409,6 +1411,7 @@ ExplainNode(PlanState *planstate, List *ancestors, if (custom_name) ExplainPropertyText("Custom Plan Provider", custom_name, es); ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es); + ExplainPropertyBool("Async Capable", planstate->async_capable, es); } switch (nodeTag(plan)) diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile index f990c6473a..1004647d4f 100644 --- a/src/backend/executor/Makefile +++ b/src/backend/executor/Makefile @@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global OBJS = \ execAmi.o \ + execAsync.o \ execCurrent.o \ execExpr.o \ execExprInterp.o \ diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 23bdb53cd1..613835b748 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -526,6 +526,10 @@ ExecSupportsBackwardScan(Plan *node) { ListCell *l; + /* With async, tuples may be interleaved, so can't back up. */ + if (((Append *) node)->nasyncplans != 0) + return false; + foreach(l, ((Append *) node)->appendplans) { if (!ExecSupportsBackwardScan((Plan *) lfirst(l))) diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index e69de29bb2..4e87ae6489 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -0,0 +1,138 @@ +/*------------------------------------------------------------------------- + * + * execAsync.c + * Support routines for asynchronous execution + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/execAsync.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "executor/execAsync.h" +#include "executor/nodeAppend.h" +#include "executor/nodeForeignscan.h" + +static void ExecAsyncResponse(AsyncRequest *areq); + +/* + * Re-examine a plan node that was considered async-capable at plan time. + */ +bool +ExecReconsiderAsyncCapablePlan(PlanState *node, AsyncContext *acxt) +{ + bool result; + + switch (nodeTag(node)) + { + case T_ForeignScanState: + result = ExecReconsiderAsyncForeignScan((ForeignScanState *) node, + acxt); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(node)); + result = false; /* keep compiler quiet */ + break; + } + + return result; +} + +/* + * Asynchronously request a tuple from a designed async-capable node. + */ +void +ExecAsyncRequest(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanRequest(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } + + ExecAsyncResponse(areq); +} + +/* + * Give the asynchronous node a chance to configure the file descriptor event + * for which it wishes to wait. We expect the node-type specific callback to + * make a sigle call of the following form: + * + * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); + */ +void +ExecAsyncConfigureWait(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanConfigureWait(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } +} + +/* + * Call the asynchronous node back when a relevant event has occurred. + */ +void +ExecAsyncNotify(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestee)) + { + case T_ForeignScanState: + ExecAsyncForeignScanNotify(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestee)); + } + + ExecAsyncResponse(areq); +} + +/* + * Call the requestor back when an asynchronous node has produced a result. + */ +static void +ExecAsyncResponse(AsyncRequest *areq) +{ + switch (nodeTag(areq->requestor)) + { + case T_AppendState: + ExecAsyncAppendResponse(areq); + break; + default: + /* If the node doesn't support async, caller messed up. */ + elog(ERROR, "unrecognized node type: %d", + (int) nodeTag(areq->requestor)); + } +} + +/* + * A requestee node should call this function to deliver the tuple to its + * requestor node. The node can call this from its ExecAsyncRequest callback + * if the requested tuple is available immediately. + */ +void +ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result) +{ + areq->request_complete = true; + areq->result = result; +} diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index c74ce36ffb..caf11dc4e0 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -48,6 +48,7 @@ #include "commands/matview.h" #include "commands/trigger.h" #include "executor/execdebug.h" +#include "executor/nodeAppend.h" #include "executor/nodeSubplan.h" #include "foreign/fdwapi.h" #include "jit/jit.h" @@ -78,6 +79,8 @@ ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook = NULL; /* decls for local routines only used within this module */ static void InitPlan(QueryDesc *queryDesc, int eflags); static void CheckValidRowMarkRel(Relation rel, RowMarkType markType); +static void ExecBuildForeignScanHashTable(EState *estate); +static void ExecReconsiderPlan(EState *estate); static void ExecPostprocessPlan(EState *estate); static void ExecEndPlan(PlanState *planstate, EState *estate); static void ExecutePlan(EState *estate, PlanState *planstate, @@ -886,6 +889,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) /* signal that this EState is not used for EPQ */ estate->es_epq_active = NULL; + if (plannedstmt->asyncPlan) + ExecBuildForeignScanHashTable(estate); + /* * Initialize private state information for each SubPlan. We must do this * before running ExecInitNode on the main query tree, since @@ -924,6 +930,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) */ planstate = ExecInitNode(plan, estate, eflags); + if (plannedstmt->asyncPlan) + ExecReconsiderPlan(estate); + /* * Get the tuple descriptor describing the type of tuples to return. */ @@ -1321,6 +1330,35 @@ ExecGetTriggerResultRel(EState *estate, Oid relid) return rInfo; } +static void +ExecBuildForeignScanHashTable(EState *estate) +{ + HASHCTL ctl; + + MemSet(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(ForeignScanHashEntry); + ctl.hcxt = CurrentMemoryContext; + + estate->es_foreign_scan_hash = + hash_create("User mapping dependency table", 256, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +static void +ExecReconsiderPlan(EState *estate) +{ + ListCell *lc; + + foreach(lc, estate->es_asyncappends) + { + AppendState *appendstate = (AppendState *) lfirst(lc); + + ExecReconsiderAsyncAppend(appendstate); + } +} + /* ---------------------------------------------------------------- * ExecPostprocessPlan * diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c index c734283bfe..df7b9b591b 100644 --- a/src/backend/executor/execUtils.c +++ b/src/backend/executor/execUtils.c @@ -156,6 +156,9 @@ CreateExecutorState(void) estate->es_use_parallel_mode = false; + estate->es_asyncappends = NIL; + estate->es_foreign_scan_hash = NULL; + estate->es_jit_flags = 0; estate->es_jit = NULL; diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 15e4115bd6..3896d9fcd4 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -57,10 +57,13 @@ #include "postgres.h" +#include "executor/execAsync.h" #include "executor/execdebug.h" #include "executor/execPartition.h" #include "executor/nodeAppend.h" #include "miscadmin.h" +#include "pgstat.h" +#include "storage/latch.h" /* Shared state for parallel-aware Append. */ struct ParallelAppendState @@ -78,12 +81,18 @@ struct ParallelAppendState }; #define INVALID_SUBPLAN_INDEX -1 +#define EVENT_BUFFER_SIZE 16 static TupleTableSlot *ExecAppend(PlanState *pstate); static bool choose_next_subplan_locally(AppendState *node); static bool choose_next_subplan_for_leader(AppendState *node); static bool choose_next_subplan_for_worker(AppendState *node); static void mark_invalid_subplans_as_finished(AppendState *node); +static void ExecAppendAsyncBegin(AppendState *node); +static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); +static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); +static void ExecAppendAsyncEventWait(AppendState *node); +static void classify_matching_subplans(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -102,7 +111,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags) AppendState *appendstate = makeNode(AppendState); PlanState **appendplanstates; Bitmapset *validsubplans; + Bitmapset *asyncplans; + Bitmapset *asyncplanids; int nplans; + int nasyncplans; int firstvalid; int i, j; @@ -119,6 +131,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* Let choose_next_subplan_* function handle setting the first subplan */ appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; + appendstate->as_syncdone = false; /* If run-time partition pruning is enabled, then set that up now */ if (node->part_prune_info != NULL) @@ -191,12 +204,27 @@ ExecInitAppend(Append *node, EState *estate, int eflags) * While at it, find out the first valid partial plan. */ j = 0; + asyncplans = NULL; + asyncplanids = NULL; + nasyncplans = 0; firstvalid = nplans; i = -1; while ((i = bms_next_member(validsubplans, i)) >= 0) { Plan *initNode = (Plan *) list_nth(node->appendplans, i); + /* + * Record async subplans. When executing EvalPlanQual, we process + * async subplans synchronously, so don't do this in that case. + */ + if (initNode->async_capable && estate->es_epq_active == NULL) + { + asyncplans = bms_add_member(asyncplans, j); + asyncplanids = bms_add_member(asyncplanids, + initNode->plan_node_id); + nasyncplans++; + } + /* * Record the lowest appendplans index which is a valid partial plan. */ @@ -210,6 +238,11 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->appendplans = appendplanstates; appendstate->as_nplans = nplans; + /* Initialize async state */ + appendstate->as_asyncplans = asyncplans; + appendstate->as_asyncplanids = asyncplanids; + appendstate->as_nasyncplans = nasyncplans; + /* * Miscellaneous initialization */ @@ -219,6 +252,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags) /* For parallel query, this will be overridden later. */ appendstate->choose_next_subplan = choose_next_subplan_locally; + /* + * Lastly, if there is at least one async subplan, add the Append node to + * estate->es_asyncappends so that we can re-examine it in + * ExecReconsiderPlan. + */ + if (nasyncplans > 0) + estate->es_asyncappends = lappend(estate->es_asyncappends, + appendstate); + return appendstate; } @@ -232,31 +274,45 @@ static TupleTableSlot * ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + TupleTableSlot *result; - if (node->as_whichplan < 0) + if (!node->as_syncdone && node->as_whichplan == INVALID_SUBPLAN_INDEX) { /* Nothing to do if there are no subplans */ if (node->as_nplans == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); + /* If there are any async subplans, begin execution of them */ + if (node->as_nasyncplans > 0) + ExecAppendAsyncBegin(node); + /* - * If no subplan has been chosen, we must choose one before + * If no sync subplan has been chosen, we must choose one before * proceeding. */ - if (node->as_whichplan == INVALID_SUBPLAN_INDEX && - !node->choose_next_subplan(node)) + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } for (;;) { PlanState *subnode; - TupleTableSlot *result; CHECK_FOR_INTERRUPTS(); /* - * figure out which subplan we are currently processing + * try to get a tuple from any of the async subplans + */ + if (!bms_is_empty(node->as_needrequest) || + (node->as_syncdone && node->as_nasyncremain > 0)) + { + if (ExecAppendAsyncGetNext(node, &result)) + return result; + Assert(bms_is_empty(node->as_needrequest)); + } + + /* + * figure out which sync subplan we are currently processing */ Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; @@ -276,8 +332,16 @@ ExecAppend(PlanState *pstate) return result; } - /* choose new subplan; if none, we're done */ - if (!node->choose_next_subplan(node)) + /* wait or poll async events */ + if (node->as_nasyncremain > 0) + { + Assert(!node->as_syncdone); + Assert(bms_is_empty(node->as_needrequest)); + ExecAppendAsyncEventWait(node); + } + + /* choose new sync subplan; if no sync/async subplans, we're done */ + if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) return ExecClearTuple(node->ps.ps_ResultTupleSlot); } } @@ -313,6 +377,7 @@ ExecEndAppend(AppendState *node) void ExecReScanAppend(AppendState *node) { + int nasyncplans = node->as_nasyncplans; int i; /* @@ -326,6 +391,11 @@ ExecReScanAppend(AppendState *node) { bms_free(node->as_valid_subplans); node->as_valid_subplans = NULL; + if (nasyncplans > 0) + { + bms_free(node->as_valid_asyncplans); + node->as_valid_asyncplans = NULL; + } } for (i = 0; i < node->as_nplans; i++) @@ -347,8 +417,26 @@ ExecReScanAppend(AppendState *node) ExecReScan(subnode); } + /* Reset async state */ + if (nasyncplans > 0) + { + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + } + + bms_free(node->as_needrequest); + node->as_needrequest = NULL; + } + /* Let choose_next_subplan_* function handle setting the first subplan */ node->as_whichplan = INVALID_SUBPLAN_INDEX; + node->as_syncdone = false; } /* ---------------------------------------------------------------- @@ -429,7 +517,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) /* ---------------------------------------------------------------- * choose_next_subplan_locally * - * Choose next subplan for a non-parallel-aware Append, + * Choose next sync subplan for a non-parallel-aware Append, * returning false if there are no more. * ---------------------------------------------------------------- */ @@ -444,9 +532,9 @@ choose_next_subplan_locally(AppendState *node) /* * If first call then have the bms member function choose the first valid - * subplan by initializing whichplan to -1. If there happen to be no - * valid subplans then the bms member function will handle that by - * returning a negative number which will allow us to exit returning a + * sync subplan by initializing whichplan to -1. If there happen to be + * no valid sync subplans then the bms member function will handle that + * by returning a negative number which will allow us to exit returning a * false value. */ if (whichplan == INVALID_SUBPLAN_INDEX) @@ -467,7 +555,10 @@ choose_next_subplan_locally(AppendState *node) nextplan = bms_prev_member(node->as_valid_subplans, whichplan); if (nextplan < 0) + { + node->as_syncdone = true; return false; + } node->as_whichplan = nextplan; @@ -709,3 +800,362 @@ mark_invalid_subplans_as_finished(AppendState *node) node->as_pstate->pa_finished[i] = true; } } + +/* ---------------------------------------------------------------- + * ExecReconsiderAsyncAppend + * + * Re-examine an async-aware Append node + * ---------------------------------------------------------------- + */ +void +ExecReconsiderAsyncAppend(AppendState *node) +{ + Bitmapset *asyncplans = bms_copy(node->as_asyncplans); + int nasyncplans; + AsyncRequest **asyncrequests; + AsyncContext acxt; + int i; + + asyncrequests = (AsyncRequest **) palloc0(node->as_nplans * + sizeof(AsyncRequest *)); + + /* Re-examine each async subplan */ + acxt.requestor = (PlanState *) node; + i = -1; + while ((i = bms_next_member(asyncplans, i)) >= 0) + { + PlanState *subnode = node->appendplans[i]; + + acxt.request_index = i; + if (!ExecReconsiderAsyncCapablePlan(subnode, &acxt)) + { + bms_del_member(node->as_asyncplans, i); + bms_del_member(node->as_asyncplanids, + subnode->plan->plan_node_id); + --node->as_nasyncplans; + } + else + { + AsyncRequest *areq; + + areq = palloc(sizeof(AsyncRequest)); + areq->requestor = (PlanState *) node; + areq->requestee = subnode; + areq->request_index = i; + areq->callback_pending = false; + areq->request_complete = false; + areq->result = NULL; + + asyncrequests[i] = areq; + } + } + bms_free(asyncplans); + + /* No need for further processing if there are no async subplans */ + nasyncplans = node->as_nasyncplans; + if (nasyncplans == 0) + return; + + /* Initialize remaining async state */ + node->as_asyncrequests = asyncrequests; + node->as_asyncresults = (TupleTableSlot **) + palloc0(nasyncplans * sizeof(TupleTableSlot *)); + node->as_needrequest = NULL; + + classify_matching_subplans(node); +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncBegin + * + * Begin execution of designed async-capable subplans. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncBegin(AppendState *node) +{ + Bitmapset *valid_asyncplans; + int i; + + /* We should never be called when there are no async subplans. */ + Assert(node->as_nasyncplans > 0); + + if (node->as_valid_subplans == NULL) + { + Assert(node->as_valid_asyncplans == NULL); + + node->as_valid_subplans = + ExecFindMatchingSubPlans(node->as_prune_state); + + classify_matching_subplans(node); + } + + node->as_nasyncremain = 0; + + /* Nothing to do if there are no valid async subplans. */ + valid_asyncplans = node->as_valid_asyncplans; + if (valid_asyncplans == NULL) + return; + + /* Make a request for each of the async subplans. */ + i = -1; + while ((i = bms_next_member(valid_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + Assert(areq->request_index == i); + Assert(!areq->callback_pending); + + /* Do the actual work. */ + ExecAsyncRequest(areq); + + ++node->as_nasyncremain; + } +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncGetNext + * + * Get the next tuple from any of the asynchronous subplans. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) +{ + *result = NULL; + + /* Make new async requests. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + while (node->as_nasyncremain > 0) + { + CHECK_FOR_INTERRUPTS(); + + /* Wait or poll async events. */ + ExecAppendAsyncEventWait(node); + + /* Make new async requests. */ + if (ExecAppendAsyncRequest(node, result)) + return true; + + /* Break from loop if there is any sync node that is not complete */ + if (!node->as_syncdone) + break; + } + + /* + * If all sync subplans are complete, we're totally done scanning the + * givne node. Otherwise, we're done with the asynchronous stuff but + * must continue scanning the sync subplans. + */ + if (node->as_syncdone) + { + Assert(node->as_nasyncremain == 0); + *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncRequest + * + * If there are any asynchronous subplans that need a new asynchronous + * request, make all of them. + * ---------------------------------------------------------------- + */ +static bool +ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) +{ + Bitmapset *needrequest; + int i; + + /* Nothing to do if there are no async subplans needing a new request. */ + if (bms_is_empty(node->as_needrequest)) + return false; + + /* + * If there are any asynchronously-generated results that have not yet + * been returned, we have nothing to do; just return one of them. + */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + /* Make a new request for each of the async subplans that need it. */ + needrequest = node->as_needrequest; + node->as_needrequest = NULL; + i = -1; + while ((i = bms_next_member(needrequest, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + /* Do the actual work. */ + ExecAsyncRequest(areq); + } + bms_free(needrequest); + + /* Return one of the asynchronously-generated results if any. */ + if (node->as_nasyncresults > 0) + { + --node->as_nasyncresults; + *result = node->as_asyncresults[node->as_nasyncresults]; + return true; + } + + return false; +} + +/* ---------------------------------------------------------------- + * ExecAppendAsyncEventWait + * + * Wait or poll for file descriptor wait events and fire callbacks. + * ---------------------------------------------------------------- + */ +static void +ExecAppendAsyncEventWait(AppendState *node) +{ + long timeout = node->as_syncdone ? -1 : 0; + WaitEvent occurred_event[EVENT_BUFFER_SIZE]; + int noccurred; + int i; + + /* Nothing to do if there are no remaining async subplans. */ + if (node->as_nasyncremain == 0) + return; + + node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, + node->as_nasyncplans + 1); + AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, + NULL, NULL); + + /* Give each waiting subplan a chance to add a event. */ + i = -1; + while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) + { + AsyncRequest *areq = node->as_asyncrequests[i]; + + if (areq->callback_pending) + ExecAsyncConfigureWait(areq); + } + + /* Wait for at least one event to occur. */ + noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, + EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY); + FreeWaitEventSet(node->as_eventset); + node->as_eventset = NULL; + if (noccurred == 0) + return; + + /* Deliver notifications. */ + for (i = 0; i < noccurred; i++) + { + WaitEvent *w = &occurred_event[i]; + + /* + * Each waiting subplan should have registered its wait event with + * user_data pointing back to its AsyncRequest. + */ + if ((w->events & WL_SOCKET_READABLE) != 0) + { + AsyncRequest *areq = (AsyncRequest *) w->user_data; + + /* + * Mark it as no longer needing a callback. We must do this + * before dispatching the callback in case the callback resets + * the flag. + */ + Assert(areq->callback_pending); + areq->callback_pending = false; + + /* Do the actual work. */ + ExecAsyncNotify(areq); + } + } +} + +/* ---------------------------------------------------------------- + * ExecAsyncAppendResponse + * + * Receive a response from an asynchronous request we made. + * ---------------------------------------------------------------- + */ +void +ExecAsyncAppendResponse(AsyncRequest *areq) +{ + AppendState *node = (AppendState *) areq->requestor; + TupleTableSlot *slot = areq->result; + + /* The result should be a TupleTableSlot or NULL. */ + Assert(slot == NULL || IsA(slot, TupleTableSlot)); + + /* Nothing to do if the request is pending. */ + if (!areq->request_complete) + { + /* + * The subplan for which the request was made would be pending for a + * callback. + */ + Assert(areq->callback_pending); + return; + } + + /* If the result is NULL or an empty slot, there's nothing more to do. */ + if (TupIsNull(slot)) + { + /* The ending subplan wouldn't have been pending for a callback. */ + Assert(!areq->callback_pending); + --node->as_nasyncremain; + return; + } + + /* Save result so we can return it */ + Assert(node->as_nasyncresults < node->as_nasyncplans); + node->as_asyncresults[node->as_nasyncresults++] = slot; + + /* + * Mark the subplan that returned a result as ready for a new request. We + * don't launch another one here immediately because it might complete. + */ + node->as_needrequest = bms_add_member(node->as_needrequest, + areq->request_index); +} + +/* ---------------------------------------------------------------- + * classify_matching_subplans + * + * Classify the node's as_valid_subplans into sync ones and + * async ones, adjust it to contain sync ones only, and save + * async ones in the node's as_valid_asyncplans + * ---------------------------------------------------------------- + */ +static void +classify_matching_subplans(AppendState *node) +{ + Bitmapset *valid_asyncplans; + + /* Nothing to do if there are no valid subplans. */ + if (bms_is_empty(node->as_valid_subplans)) + return; + + /* Nothing to do if there are no valid async subplans. */ + if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) + return; + + /* Get valid async subplans. */ + valid_asyncplans = bms_copy(node->as_asyncplans); + valid_asyncplans = bms_int_members(valid_asyncplans, + node->as_valid_subplans); + + /* Adjust the valid subplans to contain sync subplans only. */ + node->as_valid_subplans = bms_del_members(node->as_valid_subplans, + valid_asyncplans); + + /* Save valid async subplans. */ + node->as_valid_asyncplans = valid_asyncplans; +} diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c index 0969e53c3a..c92a35b8a6 100644 --- a/src/backend/executor/nodeForeignscan.c +++ b/src/backend/executor/nodeForeignscan.c @@ -22,6 +22,7 @@ */ #include "postgres.h" +#include "executor/execAsync.h" #include "executor/executor.h" #include "executor/nodeForeignscan.h" #include "foreign/fdwapi.h" @@ -222,6 +223,9 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) if (node->resultRelation > 0) scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1]; + /* Initialize the async_capable flag. */ + scanstate->ss.ps.async_capable = ((Plan *) node)->async_capable; + /* Initialize any outer plan. */ if (outerPlan(node)) outerPlanState(scanstate) = @@ -391,3 +395,73 @@ ExecShutdownForeignScan(ForeignScanState *node) if (fdwroutine->ShutdownForeignScan) fdwroutine->ShutdownForeignScan(node); } + +/* ---------------------------------------------------------------- + * ExecReconsiderAsyncForeignScan + * + * Re-examine a ForeignScan node that was considered async-capable + * at plan time. + * ---------------------------------------------------------------- + */ +bool +ExecReconsiderAsyncForeignScan(ForeignScanState *node, AsyncContext *acxt) +{ + FdwRoutine *fdwroutine = node->fdwroutine; + bool result = true; + + if (fdwroutine->ReconsiderAsyncForeignScan) + { + result = fdwroutine->ReconsiderAsyncForeignScan(node, acxt); + if (!result) + node->ss.ps.async_capable = false; + } + return result; +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanRequest + * + * Asynchronously request a tuple from a designed async-capable node + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanRequest(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncRequest != NULL); + fdwroutine->ForeignAsyncRequest(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanConfigureWait + * + * In async mode, configure for a wait + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanConfigureWait(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); + fdwroutine->ForeignAsyncConfigureWait(areq); +} + +/* ---------------------------------------------------------------- + * ExecAsyncForeignScanNotify + * + * Callback invoked when a relevant event has occurred + * ---------------------------------------------------------------- + */ +void +ExecAsyncForeignScanNotify(AsyncRequest *areq) +{ + ForeignScanState *node = (ForeignScanState *) areq->requestee; + FdwRoutine *fdwroutine = node->fdwroutine; + + Assert(fdwroutine->ForeignAsyncNotify != NULL); + fdwroutine->ForeignAsyncNotify(areq); +} diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 65bbc18ecb..8d64772931 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -87,6 +87,7 @@ _copyPlannedStmt(const PlannedStmt *from) COPY_SCALAR_FIELD(transientPlan); COPY_SCALAR_FIELD(dependsOnRole); COPY_SCALAR_FIELD(parallelModeNeeded); + COPY_SCALAR_FIELD(asyncPlan); COPY_SCALAR_FIELD(jitFlags); COPY_NODE_FIELD(planTree); COPY_NODE_FIELD(rtable); @@ -120,6 +121,7 @@ CopyPlanFields(const Plan *from, Plan *newnode) COPY_SCALAR_FIELD(plan_width); COPY_SCALAR_FIELD(parallel_aware); COPY_SCALAR_FIELD(parallel_safe); + COPY_SCALAR_FIELD(async_capable); COPY_SCALAR_FIELD(plan_node_id); COPY_NODE_FIELD(targetlist); COPY_NODE_FIELD(qual); @@ -241,6 +243,7 @@ _copyAppend(const Append *from) */ COPY_BITMAPSET_FIELD(apprelids); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(nasyncplans); COPY_SCALAR_FIELD(first_partial_plan); COPY_NODE_FIELD(part_prune_info); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index f5dcedf6e8..80a853d706 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -305,6 +305,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node) WRITE_BOOL_FIELD(transientPlan); WRITE_BOOL_FIELD(dependsOnRole); WRITE_BOOL_FIELD(parallelModeNeeded); + WRITE_BOOL_FIELD(asyncPlan); WRITE_INT_FIELD(jitFlags); WRITE_NODE_FIELD(planTree); WRITE_NODE_FIELD(rtable); @@ -333,6 +334,7 @@ _outPlanInfo(StringInfo str, const Plan *node) WRITE_INT_FIELD(plan_width); WRITE_BOOL_FIELD(parallel_aware); WRITE_BOOL_FIELD(parallel_safe); + WRITE_BOOL_FIELD(async_capable); WRITE_INT_FIELD(plan_node_id); WRITE_NODE_FIELD(targetlist); WRITE_NODE_FIELD(qual); @@ -431,6 +433,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_BITMAPSET_FIELD(apprelids); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(nasyncplans); WRITE_INT_FIELD(first_partial_plan); WRITE_NODE_FIELD(part_prune_info); } @@ -2221,6 +2224,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node) WRITE_BOOL_FIELD(parallelModeOK); WRITE_BOOL_FIELD(parallelModeNeeded); WRITE_CHAR_FIELD(maxParallelHazard); + WRITE_BOOL_FIELD(asyncPlan); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 4388aae71d..5104d1a2b4 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1581,6 +1581,7 @@ _readPlannedStmt(void) READ_BOOL_FIELD(transientPlan); READ_BOOL_FIELD(dependsOnRole); READ_BOOL_FIELD(parallelModeNeeded); + READ_BOOL_FIELD(asyncPlan); READ_INT_FIELD(jitFlags); READ_NODE_FIELD(planTree); READ_NODE_FIELD(rtable); @@ -1614,6 +1615,7 @@ ReadCommonPlan(Plan *local_node) READ_INT_FIELD(plan_width); READ_BOOL_FIELD(parallel_aware); READ_BOOL_FIELD(parallel_safe); + READ_BOOL_FIELD(async_capable); READ_INT_FIELD(plan_node_id); READ_NODE_FIELD(targetlist); READ_NODE_FIELD(qual); @@ -1710,6 +1712,7 @@ _readAppend(void) READ_BITMAPSET_FIELD(apprelids); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(nasyncplans); READ_INT_FIELD(first_partial_plan); READ_NODE_FIELD(part_prune_info); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index aab06c7d21..3b034a0326 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false; bool enable_parallel_append = true; bool enable_parallel_hash = true; bool enable_partition_pruning = true; +bool enable_async_append = true; typedef struct { diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 6c8305c977..b30f8255f2 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals); static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan, List *gating_quals); static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path); +static bool is_async_capable_path(Path *path); static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags); static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path, @@ -1066,6 +1067,30 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) return plan; } +/* + * is_async_capable_path + * Check whether a given Path node is async-capable. + */ +static bool +is_async_capable_path(Path *path) +{ + switch (nodeTag(path)) + { + case T_ForeignPath: + { + FdwRoutine *fdwroutine = path->parent->fdwroutine; + + Assert(fdwroutine != NULL); + if (fdwroutine->IsForeignPathAsyncCapable != NULL && + fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path)) + return true; + } + default: + break; + } + return false; +} + /* * create_append_plan * Create an Append plan for 'best_path' and (recursively) plans @@ -1083,6 +1108,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) List *pathkeys = best_path->path.pathkeys; List *subplans = NIL; ListCell *subpaths; + int nasyncplans = 0; RelOptInfo *rel = best_path->path.parent; PartitionPruneInfo *partpruneinfo = NULL; int nodenumsortkeys = 0; @@ -1090,6 +1116,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) Oid *nodeSortOperators = NULL; Oid *nodeCollations = NULL; bool *nodeNullsFirst = NULL; + bool consider_async = false; /* * The subpaths list could be empty, if every child was proven empty by @@ -1153,6 +1180,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist)); } + /* If appropriate, consider async append */ + consider_async = (enable_async_append && pathkeys == NIL && + !best_path->path.parallel_safe && + list_length(best_path->subpaths) > 1); + /* Build the plan for each child */ foreach(subpaths, best_path->subpaths) { @@ -1220,6 +1252,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } subplans = lappend(subplans, subplan); + + /* Check to see if subplan can be executed asynchronously */ + if (consider_async && is_async_capable_path(subpath)) + { + subplan->async_capable = true; + ++nasyncplans; + } } /* @@ -1252,9 +1291,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags) } plan->appendplans = subplans; + plan->nasyncplans = nasyncplans; plan->first_partial_plan = best_path->first_partial_path; plan->part_prune_info = partpruneinfo; + if (nasyncplans > 0) + root->glob->asyncPlan = true; + copy_generic_path_info(&plan->plan, (Path *) best_path); /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index adf68d8790..95e7601a31 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -312,6 +312,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->lastPlanNodeId = 0; glob->transientPlan = false; glob->dependsOnRole = false; + glob->asyncPlan = false; /* * Assess whether it's feasible to use parallel mode for this query. We @@ -513,6 +514,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->transientPlan = glob->transientPlan; result->dependsOnRole = glob->dependsOnRole; result->parallelModeNeeded = glob->parallelModeNeeded; + result->asyncPlan = glob->asyncPlan; result->planTree = top_plan; result->rtable = glob->finalrtable; result->resultRelations = glob->resultRelations; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index f75b52719d..58f8e0bbcf 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3999,6 +3999,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) switch (w) { + case WAIT_EVENT_APPEND_READY: + event_name = "AppendReady"; + break; case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE: event_name = "BackupWaitWalArchive"; break; diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index eafdb1118e..507567aff3 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1111,6 +1111,16 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL, NULL }, + { + {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of async append plans."), + NULL, + GUC_EXPLAIN + }, + &enable_async_append, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, gettext_noop("Enables genetic query optimization."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index bd57e917e1..1306094865 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -370,6 +370,7 @@ #enable_partitionwise_aggregate = off #enable_parallel_hash = on #enable_partition_pruning = on +#enable_async_append = on # - Planner Cost Constants - diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h index e69de29bb2..bce30417d7 100644 --- a/src/include/executor/execAsync.h +++ b/src/include/executor/execAsync.h @@ -0,0 +1,31 @@ +/*------------------------------------------------------------------------- + * execAsync.h + * Support functions for asynchronous execution + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/include/executor/execAsync.h + *------------------------------------------------------------------------- + */ + +#ifndef EXECASYNC_H +#define EXECASYNC_H + +#include "nodes/execnodes.h" + +typedef struct AsyncContext +{ + PlanState *requestor; + int request_index; +} AsyncContext; + +extern bool ExecReconsiderAsyncCapablePlan(PlanState *node, + AsyncContext *acxt); +extern void ExecAsyncRequest(AsyncRequest *areq); +extern void ExecAsyncConfigureWait(AsyncRequest *areq); +extern void ExecAsyncNotify(AsyncRequest *areq); +extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result); + +#endif /* EXECASYNC_H */ diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index cafd410a5d..8c7ebc2998 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -25,4 +25,7 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt); extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt); +extern void ExecReconsiderAsyncAppend(AppendState *node); +extern void ExecAsyncAppendResponse(AsyncRequest *areq); + #endif /* NODEAPPEND_H */ diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h index 6ae7733e25..56c3809d2d 100644 --- a/src/include/executor/nodeForeignscan.h +++ b/src/include/executor/nodeForeignscan.h @@ -17,6 +17,8 @@ #include "access/parallel.h" #include "nodes/execnodes.h" +struct AsyncContext; + extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags); extern void ExecEndForeignScan(ForeignScanState *node); extern void ExecReScanForeignScan(ForeignScanState *node); @@ -31,4 +33,10 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownForeignScan(ForeignScanState *node); +extern bool ExecReconsiderAsyncForeignScan(ForeignScanState *node, + struct AsyncContext *acxt); +extern void ExecAsyncForeignScanRequest(AsyncRequest *areq); +extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq); +extern void ExecAsyncForeignScanNotify(AsyncRequest *areq); + #endif /* NODEFOREIGNSCAN_H */ diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h index 248f78da45..99cabd6b94 100644 --- a/src/include/foreign/fdwapi.h +++ b/src/include/foreign/fdwapi.h @@ -19,6 +19,7 @@ /* To avoid including explain.h here, reference ExplainState thus: */ struct ExplainState; +struct AsyncContext; /* * Callback function signatures --- see fdwhandler.sgml for more info. @@ -178,6 +179,17 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root, List *fdw_private, RelOptInfo *child_rel); +typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path); + +typedef bool (*ReconsiderAsyncForeignScan_function) (ForeignScanState *node, + struct AsyncContext *acxt); + +typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq); + +typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq); + /* * FdwRoutine is the struct returned by a foreign-data wrapper's handler * function. It provides pointers to the callback functions needed by the @@ -256,6 +268,13 @@ typedef struct FdwRoutine /* Support functions for path reparameterization. */ ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild; + + /* Support functions for asynchronous execution */ + IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable; + ReconsiderAsyncForeignScan_function ReconsiderAsyncForeignScan; + ForeignAsyncRequest_function ForeignAsyncRequest; + ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait; + ForeignAsyncNotify_function ForeignAsyncNotify; } FdwRoutine; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index b6a88ff76b..68584b3c14 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -512,6 +512,32 @@ typedef struct ResultRelInfo struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer; } ResultRelInfo; +/* ---------------- + * AsyncRequest + * + * State for an asynchronous tuple request. + * ---------------- + */ +typedef struct AsyncRequest +{ + struct PlanState *requestor; /* Node that wants a tuple */ + struct PlanState *requestee; /* Node from which a tuple is wanted */ + int request_index; /* Scratch space for requestor */ + bool callback_pending; /* Callback is needed */ + bool request_complete; /* Request complete, result valid */ + TupleTableSlot *result; /* Result (NULL if no more tuples) */ +} AsyncRequest; + +/* + * Hash entry to store the set of IDs of ForeignScanStates that use the same + * user mapping + */ +typedef struct ForeignScanHashEntry +{ + Oid umid; /* hash key -- must be first */ + Bitmapset *fsplanids; +} ForeignScanHashEntry; + /* ---------------- * EState information * @@ -602,6 +628,14 @@ typedef struct EState /* The per-query shared memory area to use for parallel execution. */ struct dsa_area *es_query_dsa; + List *es_asyncappends; /* List of async-aware AppendStates */ + + /* + * Hash table to store the set of IDs of ForeignScanStates using the same + * user mapping + */ + HTAB *es_foreign_scan_hash; + /* * JIT information. es_jit_flags indicates whether JIT should be performed * and with which options. es_jit is created on-demand when JITing is @@ -969,6 +1003,8 @@ typedef struct PlanState */ Bitmapset *chgParam; /* set of IDs of changed Params */ + bool async_capable; + /* * Other run-time state needed by most if not all node types. */ @@ -1217,12 +1253,24 @@ struct AppendState PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; + bool as_syncdone; /* all synchronous plans done? */ + Bitmapset *as_asyncplans; /* asynchronous plans indexes */ + Bitmapset *as_asyncplanids; /* asynchronous plans IDs */ + int as_nasyncplans; /* # of asynchronous plans */ + AsyncRequest **as_asyncrequests; /* array of AsyncRequests */ + TupleTableSlot **as_asyncresults; /* unreturned results of async plans */ + int as_nasyncresults; /* # of valid entries in as_asyncresults */ + int as_nasyncremain; /* # of remaining async plans */ + Bitmapset *as_needrequest; /* async plans ready for a request */ + struct WaitEventSet *as_eventset; /* WaitEventSet used to configure + * file descriptor wait events */ int as_first_partial_plan; /* Index of 'appendplans' containing * the first partial plan */ ParallelAppendState *as_pstate; /* parallel coordination info */ Size pstate_len; /* size of parallel coordination info */ struct PartitionPruneState *as_prune_state; Bitmapset *as_valid_subplans; + Bitmapset *as_valid_asyncplans; bool (*choose_next_subplan) (AppendState *); }; diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 0ec93e648c..e76db3eb4c 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -141,6 +141,8 @@ typedef struct PlannerGlobal char maxParallelHazard; /* worst PROPARALLEL hazard level */ PartitionDirectory partition_directory; /* partition descriptors */ + + bool asyncPlan; /* does plan have async-aware Append? */ } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 43160439f0..c636b498ef 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -59,6 +59,8 @@ typedef struct PlannedStmt bool parallelModeNeeded; /* parallel mode required to execute? */ + bool asyncPlan; /* does plan have async-aware Append? */ + int jitFlags; /* which forms of JIT should be performed */ struct Plan *planTree; /* tree of Plan nodes */ @@ -129,6 +131,11 @@ typedef struct Plan bool parallel_aware; /* engage parallel-aware logic? */ bool parallel_safe; /* OK to use as part of parallel plan? */ + /* + * information needed for asynchronous execution + */ + bool async_capable; /* engage asynchronous-capable logic? */ + /* * Common structural data for all Plan types. */ @@ -245,6 +252,7 @@ typedef struct Append Plan plan; Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */ List *appendplans; + int nasyncplans; /* # of asynchronous plans */ /* * All 'appendplans' preceding this index are non-partial plans. All diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index ed2e4af4be..c2952e375d 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate; extern PGDLLIMPORT bool enable_parallel_append; extern PGDLLIMPORT bool enable_parallel_hash; extern PGDLLIMPORT bool enable_partition_pruning; +extern PGDLLIMPORT bool enable_async_append; extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 724068cf87..d9588da38a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -957,6 +957,7 @@ typedef enum */ typedef enum { + WAIT_EVENT_APPEND_READY, WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC, WAIT_EVENT_BGWORKER_SHUTDOWN, WAIT_EVENT_BGWORKER_STARTUP, diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out index dc7ab2ce8b..e78ca7bddb 100644 --- a/src/test/regress/expected/explain.out +++ b/src/test/regress/expected/explain.out @@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + + "Async Capable": false, + "Relation Name": "int8_tbl",+ "Alias": "i8", + "Startup Cost": N.N, + @@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8 + Seq Scan + false + + false + int8_tbl + i8 + N.N + @@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int - Plan: + Node Type: "Seq Scan" + Parallel Aware: false + + Async Capable: false + Relation Name: "int8_tbl"+ Alias: "i8" + Startup Cost: N.N + @@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8' "Plan": { + "Node Type": "Seq Scan", + "Parallel Aware": false, + + "Async Capable": false, + "Relation Name": "int8_tbl",+ "Alias": "i8", + "Startup Cost": N.N, + @@ -348,6 +352,7 @@ select jsonb_pretty( "Actual Rows": 0, + "Actual Loops": 0, + "Startup Cost": 0.0, + + "Async Capable": false, + "Relation Name": "tenk1", + "Parallel Aware": true, + "Local Hit Blocks": 0, + @@ -393,6 +398,7 @@ select jsonb_pretty( "Actual Rows": 0, + "Actual Loops": 0, + "Startup Cost": 0.0, + + "Async Capable": false, + "Parallel Aware": false, + "Sort Space Used": 0, + "Local Hit Blocks": 0, + @@ -435,6 +441,7 @@ select jsonb_pretty( "Actual Rows": 0, + "Actual Loops": 0, + "Startup Cost": 0.0, + + "Async Capable": false, + "Parallel Aware": false, + "Workers Planned": 0, + "Local Hit Blocks": 0, + diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index d574583844..406fb88130 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from "Node Type": "Incremental Sort", + "Actual Rows": 55, + "Actual Loops": 1, + + "Async Capable": false, + "Presorted Key": [ + "t.a" + ], + @@ -745,6 +746,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from "Node Type": "Incremental Sort", + "Actual Rows": 70, + "Actual Loops": 1, + + "Async Capable": false, + "Presorted Key": [ + "t.a" + ], + diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out index ff157ceb1c..499245068a 100644 --- a/src/test/regress/expected/insert_conflict.out +++ b/src/test/regress/expected/insert_conflict.out @@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb "Node Type": "ModifyTable", + "Operation": "Insert", + "Parallel Aware": false, + + "Async Capable": false, + "Relation Name": "insertconflicttest", + "Alias": "insertconflicttest", + "Conflict Resolution": "UPDATE", + @@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb { + "Node Type": "Result", + "Parent Relationship": "Member", + - "Parallel Aware": false + + "Parallel Aware": false, + + "Async Capable": false + } + ] + } + diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 81bdacf59d..b7818c0637 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal; select name, setting from pg_settings where name like 'enable%'; name | setting --------------------------------+--------- + enable_async_append | on enable_bitmapscan | on enable_gathermerge | on enable_hashagg | on @@ -106,7 +107,7 @@ select name, setting from pg_settings where name like 'enable%'; enable_seqscan | on enable_sort | on enable_tidscan | on -(18 rows) +(19 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail