diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index ee0b4acf0b..3ecb8e1e4f 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
Oid serverid; /* foreign server OID used to get server name */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
@@ -117,7 +118,7 @@ static bool disconnect_cached_connections(Oid serverid);
* (not even on error), we need this flag to cue manual cleanup.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
bool retry = false;
@@ -264,6 +265,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
+ /* If caller needs access to the per-connection state, return it. */
+ if (state)
+ *state = &entry->state;
+
return entry->conn;
}
@@ -291,6 +296,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
+ memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 60c7e115d6..05428ee018 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -7021,7 +7021,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+-------
a | aaa
@@ -7049,7 +7049,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7077,7 +7077,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | aaa
@@ -7105,7 +7105,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+--------
a | newtoo
@@ -7133,7 +7133,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
(3 rows)
DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
tableoid | aa
----------+----
(0 rows)
@@ -7175,23 +7175,28 @@ insert into bar2 values(3,33,33);
insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+ QUERY PLAN
+-----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
@@ -7201,9 +7206,9 @@ select * from bar where f1 in (select f1 from foo) for update;
-> Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
f1 | f2
----+----
1 | 11
@@ -7213,23 +7218,28 @@ select * from bar where f1 in (select f1 from foo) for update;
(4 rows)
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
- QUERY PLAN
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+ QUERY PLAN
+----------------------------------------------------------------------------------------------------------------
LockRows
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
- -> Hash Join
+ -> Merge Join
Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
Inner Unique: true
- Hash Cond: (bar.f1 = foo.f1)
- -> Append
- -> Seq Scan on public.bar bar_1
+ Merge Cond: (bar.f1 = foo.f1)
+ -> Merge Append
+ Sort Key: bar.f1
+ -> Sort
Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
+ Sort Key: bar_1.f1
+ -> Seq Scan on public.bar bar_1
+ Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-> Foreign Scan on public.bar2 bar_2
Output: bar_2.f1, bar_2.f2, bar_2.ctid, bar_2.*, bar_2.tableoid
- Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
- -> Hash
+ Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+ -> Sort
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+ Sort Key: foo.f1
-> HashAggregate
Output: foo.ctid, foo.f1, foo.*, foo.tableoid
Group Key: foo.f1
@@ -7239,9 +7249,9 @@ select * from bar where f1 in (select f1 from foo) for share;
-> Foreign Scan on public.foo2 foo_2
Output: foo_2.ctid, foo_2.f1, foo_2.*, foo_2.tableoid
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+(28 rows)
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
f1 | f2
----+----
1 | 11
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 368997d9d1..11b19ae1ef 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,7 @@
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
@@ -37,9 +38,11 @@
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "postgres_fdw.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/float.h"
#include "utils/guc.h"
+#include "utils/hsearch.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@@ -159,6 +162,11 @@ typedef struct PgFdwScanState
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ /* for asynchronous execution */
+ Oid umid; /* Oid of user mapping */
+ PgFdwConnState *conn_state; /* extra per-connection state */
+ ForeignScanState *next_node; /* next ForeignScan node to activate */
+
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -408,6 +416,12 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresReconsiderAsyncForeignScan(ForeignScanState *node,
+ AsyncContext *acxt);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
/*
* Helper functions
@@ -435,7 +449,11 @@ static void adjust_foreign_grouping_path_cost(PlannerInfo *root,
static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
EquivalenceClass *ec, EquivalenceMember *em,
void *arg);
+static UserMapping *get_user_mapping(EState *estate, ForeignScan *fsplan);
+static void record_foreign_scan_info(EState *estate, ForeignScanState *node,
+ UserMapping *user);
static void create_cursor(ForeignScanState *node);
+static void fetch_more_data_begin(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
static PgFdwModifyState *create_foreign_modify(EState *estate,
@@ -491,6 +509,7 @@ static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
double *totaldeadrows);
static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
+static void request_tuple_asynchronously(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
@@ -583,6 +602,13 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for asynchronous execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ReconsiderAsyncForeignScan = postgresReconsiderAsyncForeignScan;
+ routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+ routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
PG_RETURN_POINTER(routine);
}
@@ -1417,19 +1443,40 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
{
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
EState *estate = node->ss.ps.state;
+ bool asyncPlan = estate->es_plannedstmt->asyncPlan;
PgFdwScanState *fsstate;
- RangeTblEntry *rte;
- Oid userid;
- ForeignTable *table;
UserMapping *user;
- int rtindex;
int numParams;
/*
- * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
+ * No need to work hard in EXPLAIN (no ANALYZE) case. In that case,
+ * node->fdw_state stays NULL; or node->fdw_state->conn stays NULL.
*/
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ {
+ /* Do nothing if the query plan tree has no async-aware Appends. */
+ if (!asyncPlan)
+ return;
+
+ /* Get info about user mapping. */
+ user = get_user_mapping(estate, fsplan);
+
+ /* Record the information on the ForeignScan node in the EState. */
+ record_foreign_scan_info(estate, node, user);
+
+ /*
+ * If the ForeignScan node is async-capable, save the user mapping
+ * OID in node->fdw_state for use later.
+ */
+ if (node->ss.ps.async_capable)
+ {
+ fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
+ node->fdw_state = (void *) fsstate;
+ fsstate->umid = user->umid;
+ }
+
return;
+ }
/*
* We'll save private state in node->fdw_state.
@@ -1437,28 +1484,27 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
node->fdw_state = (void *) fsstate;
- /*
- * Identify which user to do the remote access as. This should match what
- * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
- * lowest-numbered member RTE as a representative; we would get the same
- * result from any.
- */
- if (fsplan->scan.scanrelid > 0)
- rtindex = fsplan->scan.scanrelid;
- else
- rtindex = bms_next_member(fsplan->fs_relids, -1);
- rte = exec_rt_fetch(rtindex, estate);
- userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ /* Get info about user mapping. */
+ user = get_user_mapping(estate, fsplan);
- /* Get info about foreign table. */
- table = GetForeignTable(rte->relid);
- user = GetUserMapping(userid, table->serverid);
+ if (asyncPlan)
+ {
+ /* Record the information on the ForeignScan node in the EState. */
+ record_foreign_scan_info(estate, node, user);
+
+ /*
+ * If the ForeignScan node is async-capable, save the user mapping
+ * OID in node->fdw_state for use later.
+ */
+ if (node->ss.ps.async_capable)
+ fsstate->umid = user->umid;
+ }
/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1509,6 +1555,11 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+
+ /* Initialize async state */
+ fsstate->conn_state->activated = NULL;
+ fsstate->conn_state->async_query_sent = false;
+ fsstate->next_node = NULL;
}
/*
@@ -1523,8 +1574,10 @@ postgresIterateForeignScan(ForeignScanState *node)
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
+ * In sync mode, if this is the first call after Begin or ReScan, we need
+ * to create the cursor on the remote side. In async mode, we would have
+ * aready created the cursor before we get here, even if this is the first
+ * call after Begin or ReScan.
*/
if (!fsstate->cursor_exists)
create_cursor(node);
@@ -1534,6 +1587,9 @@ postgresIterateForeignScan(ForeignScanState *node)
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
+ /* In async mode, just clear tuple slot. */
+ if (node->ss.ps.async_capable)
+ return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
fetch_more_data(node);
@@ -1563,6 +1619,14 @@ postgresReScanForeignScan(ForeignScanState *node)
char sql[64];
PGresult *res;
+ /* Reset async state */
+ if (node->ss.ps.async_capable)
+ {
+ fsstate->conn_state->activated = NULL;
+ fsstate->conn_state->async_query_sent = false;
+ fsstate->next_node = NULL;
+ }
+
/* If we haven't created the cursor yet, nothing to do. */
if (!fsstate->cursor_exists)
return;
@@ -1617,10 +1681,21 @@ postgresEndForeignScan(ForeignScanState *node)
{
PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
- /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
- if (fsstate == NULL)
+ /*
+ * if fsstate is NULL or if fsstate->conn is NULL, we are in EXPLAIN;
+ * nothing to do
+ */
+ if (fsstate == NULL || fsstate->conn == NULL)
return;
+ /*
+ * If we're ending before we've collected a response from an asynchronous
+ * query, we have to consume the response.
+ */
+ if (fsstate->conn_state->activated == node &&
+ fsstate->conn_state->async_query_sent)
+ fetch_more_data(node);
+
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
close_cursor(fsstate->conn, fsstate->cursor_number);
@@ -2491,7 +2566,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, NULL);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2872,7 +2947,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3428,6 +3503,53 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
return true;
}
+static UserMapping *
+get_user_mapping(EState *estate, ForeignScan *fsplan)
+{
+ int rtindex;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
+ * lowest-numbered member RTE as a representative; we would get the same
+ * result from any.
+ */
+ if (fsplan->scan.scanrelid > 0)
+ rtindex = fsplan->scan.scanrelid;
+ else
+ rtindex = bms_next_member(fsplan->fs_relids, -1);
+ rte = exec_rt_fetch(rtindex, estate);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ table = GetForeignTable(rte->relid);
+
+ return GetUserMapping(userid, table->serverid);
+}
+
+static void
+record_foreign_scan_info(EState *estate, ForeignScanState *node,
+ UserMapping *user)
+{
+ HTAB *htab = estate->es_foreign_scan_hash;
+ int fsplanid = node->ss.ps.plan->plan_node_id;
+ bool found;
+ ForeignScanHashEntry *entry;
+
+ /* Find or create hash table entry for the user mapping. */
+ Assert(htab);
+ entry = (ForeignScanHashEntry *) hash_search(htab, &user->umid,
+ HASH_ENTER, &found);
+
+ if (!found)
+ entry->fsplanids = bms_make_singleton(fsplanid);
+ else
+ entry->fsplanids = bms_add_member(entry->fsplanids, fsplanid);
+}
+
/*
* Create cursor for node's query with current parameter values.
*/
@@ -3500,6 +3622,34 @@ create_cursor(ForeignScanState *node)
pfree(buf.data);
}
+/*
+ * Begin an asynchronous data fetch.
+ * fetch_more_data must be called to fetch the results..
+ */
+static void
+fetch_more_data_begin(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PGconn *conn = fsstate->conn;
+ char sql[64];
+
+ Assert(fsstate->conn_state->activated == node);
+ Assert(!fsstate->conn_state->async_query_sent);
+
+ /* Create the cursor synchronously. */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, conn, false, fsstate->query);
+
+ fsstate->conn_state->async_query_sent = true;
+}
+
/*
* Fetch some more rows from the node's cursor.
*/
@@ -3522,17 +3672,36 @@ fetch_more_data(ForeignScanState *node)
PG_TRY();
{
PGconn *conn = fsstate->conn;
- char sql[64];
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (node->ss.ps.async_capable)
+ {
+ Assert(fsstate->conn_state->activated == node);
+ Assert(fsstate->conn_state->async_query_sent);
+
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = PQgetResult(conn);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
+ else
+ {
+ char sql[64];
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ res = pgfdw_exec_query(conn, sql);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
@@ -3559,6 +3728,15 @@ fetch_more_data(ForeignScanState *node)
/* Must be EOF if we didn't get as many tuples as we asked for. */
fsstate->eof_reached = (numrows < fsstate->fetch_size);
+
+ /* If this was the second part of an async request, we must fetch until NULL. */
+ if (node->ss.ps.async_capable)
+ {
+ /* call once and raise error if not NULL as expected? */
+ while (PQgetResult(conn) != NULL)
+ ;
+ fsstate->conn_state->async_query_sent = false;
+ }
}
PG_FINALLY();
{
@@ -3684,7 +3862,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, NULL);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -4618,7 +4796,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
@@ -4704,7 +4882,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4932,7 +5110,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
@@ -6479,6 +6657,221 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
add_path(final_rel, (Path *) final_path);
}
+/*
+ * postgresIsForeignPathAsyncCapable
+ * Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ return true;
+}
+
+/*
+ * postgresReconsiderAsyncForeignScan
+ * Re-examine a given ForeignScan node that was planned as async-capable.
+ */
+static bool
+postgresReconsiderAsyncForeignScan(ForeignScanState *node, AsyncContext *acxt)
+{
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ EState *estate = node->ss.ps.state;
+ HTAB *htab = estate->es_foreign_scan_hash;
+ bool found;
+ ForeignScanHashEntry *entry;
+ AppendState *requestor = (AppendState *) acxt->requestor;
+ Bitmapset *asyncplanids = requestor->as_asyncplanids;
+ Bitmapset *fsplanids;
+
+ /* Find hash table entry for the ForeignScan node. */
+ Assert(htab);
+ entry = (ForeignScanHashEntry *) hash_search(htab, &fsstate->umid,
+ HASH_FIND, &found);
+ Assert(found);
+
+ fsplanids = entry->fsplanids;
+ Assert(bms_is_member(node->ss.ps.plan->plan_node_id, fsplanids));
+
+ /*
+ * If the connection used for the ForeignScan node is used in other parts
+ * of the query plan tree except async subplans of the parent Append node,
+ * disable async execution of the ForeignScan node.
+ */
+ if (!bms_is_subset(fsplanids, asyncplanids))
+ return false;
+
+ /*
+ * If the subplans of the Append node are all async-capable, and use the
+ * same connection, then we won't execute them asynchronously.
+ */
+ if (requestor->as_nasyncplans == requestor->as_nplans &&
+ !bms_nonempty_difference(asyncplanids, fsplanids))
+ return false;
+
+ return true;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ /*
+ * If this is the first call after Begin or ReScan, mark the connection
+ * as used by the ForeignScan node.
+ */
+ if (fsstate->conn_state->activated == NULL)
+ fsstate->conn_state->activated = node;
+
+ /*
+ * If the connection has already been used by a ForeignScan node, put it
+ * at the end of the chain of waiting ForeignScan nodes, and then return.
+ */
+ if (node != fsstate->conn_state->activated)
+ {
+ ForeignScanState *curr_node = fsstate->conn_state->activated;
+ PgFdwScanState *curr_fsstate = (PgFdwScanState *) curr_node->fdw_state;
+
+ /* Scan down the chain ... */
+ while (curr_fsstate->next_node)
+ {
+ curr_node = curr_fsstate->next_node;
+ Assert(node != curr_node);
+ curr_fsstate = (PgFdwScanState *) curr_node->fdw_state;
+ }
+ /* Update the chain linking */
+ curr_fsstate->next_node = node;
+ /* Mark the request as needing a callback */
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ return;
+ }
+
+ request_tuple_asynchronously(areq);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ * Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AppendState *requestor = (AppendState *) areq->requestor;
+ WaitEventSet *set = requestor->as_eventset;
+
+ /* This function should not be called unless callback_pending */
+ Assert(areq->callback_pending);
+
+ /* If the ForeignScan node isn't activated yet, nothing to do */
+ if (fsstate->conn_state->activated != node)
+ return;
+
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+ NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ * Fetch some more tuples from a file descriptor that becomes ready,
+ * requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+
+ /* The core code would have initialized the callback_pending flag */
+ Assert(!areq->callback_pending);
+
+ fetch_more_data(node);
+
+ request_tuple_asynchronously(areq);
+}
+
+/*
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+request_tuple_asynchronously(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ TupleTableSlot *result;
+
+ /* Request some more tuples, if we've run out */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though */
+ if (!fsstate->eof_reached)
+ {
+ /* Begin another fetch */
+ fetch_more_data_begin(node);
+ /* Mark the request as needing a callback */
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ return;
+ }
+ fsstate->conn_state->activated = NULL;
+
+ /* Activate the next ForeignScan node if any */
+ if (fsstate->next_node)
+ {
+ /* Mark the connection as used by the next ForeignScan node */
+ fsstate->conn_state->activated = fsstate->next_node;
+ Assert(!fsstate->conn_state->async_query_sent);
+ /* Begin an asynchronous fetch for that node */
+ fetch_more_data_begin(fsstate->next_node);
+ }
+
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ return;
+ }
+
+ /* Get a tuple from the ForeignScan node */
+ result = ExecProcNode((PlanState *) node);
+
+ if (TupIsNull(result))
+ {
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+ /* Request some more tuples, if we've not detected EOF yet */
+ if (!fsstate->eof_reached)
+ {
+ /* Begin another fetch */
+ fetch_more_data_begin(node);
+ /* Mark the request as needing a callback */
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ return;
+ }
+ fsstate->conn_state->activated = NULL;
+
+ /* Activate the next ForeignScan node if any */
+ if (fsstate->next_node)
+ {
+ /* Mark the connection as used by the next ForeignScan node */
+ fsstate->conn_state->activated = fsstate->next_node;
+ Assert(!fsstate->conn_state->async_query_sent);
+ /* Begin an asynchronous fetch for that node */
+ fetch_more_data_begin(fsstate->next_node);
+ }
+ }
+
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+}
+
/*
* Create a tuple from the specified row of the PGresult.
*
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 1f67b4d9fd..c3537b6449 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -16,6 +16,7 @@
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
+#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
@@ -124,12 +125,22 @@ typedef struct PgFdwRelationInfo
int relation_index;
} PgFdwRelationInfo;
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+ ForeignScanState *activated; /* currently-activated ForeignScan node */
+ bool async_query_sent; /* has an asynchronous query been sent? */
+} PgFdwConnState;
+
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 151f4f1834..ceda16b92f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1822,31 +1822,31 @@ INSERT INTO b(aa) VALUES('bbb');
INSERT INTO b(aa) VALUES('bbbb');
INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
DELETE FROM a;
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
SELECT tableoid::regclass, * FROM b;
SELECT tableoid::regclass, * FROM ONLY a;
@@ -1882,12 +1882,12 @@ insert into bar2 values(4,44,44);
insert into bar2 values(7,77,77);
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
-- Check UPDATE with inherited target and an inherited source table
explain (verbose, costs off)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5ef1c7ad3c..4a9eece710 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4735,6 +4735,20 @@ ANY num_sync (
+ enable_async_append (boolean)
+
+ enable_async_append configuration parameter
+
+
+
+
+ Enables or disables the query planner's use of async-aware
+ append plan types. The default is on.
+
+
+
+
enable_bitmapscan (boolean)
diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index c602ee4427..a2d2f42e28 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -1563,6 +1563,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
+
+ AppendReady
+ Waiting for a subplan of Append to be ready.
+
BackupWaitWalArchive
Waiting for WAL files required for a backup to be successfully
diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index f80e379973..a2b7b8bd67 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -1390,6 +1390,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (planstate->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
@@ -1409,6 +1411,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
if (custom_name)
ExplainPropertyText("Custom Plan Provider", custom_name, es);
ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+ ExplainPropertyBool("Async Capable", planstate->async_capable, es);
}
switch (nodeTag(plan))
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index f990c6473a..1004647d4f 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c
index 23bdb53cd1..613835b748 100644
--- a/src/backend/executor/execAmi.c
+++ b/src/backend/executor/execAmi.c
@@ -526,6 +526,10 @@ ExecSupportsBackwardScan(Plan *node)
{
ListCell *l;
+ /* With async, tuples may be interleaved, so can't back up. */
+ if (((Append *) node)->nasyncplans != 0)
+ return false;
+
foreach(l, ((Append *) node)->appendplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
index e69de29bb2..4e87ae6489 100644
--- a/src/backend/executor/execAsync.c
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,138 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+static void ExecAsyncResponse(AsyncRequest *areq);
+
+/*
+ * Re-examine a plan node that was considered async-capable at plan time.
+ */
+bool
+ExecReconsiderAsyncCapablePlan(PlanState *node, AsyncContext *acxt)
+{
+ bool result;
+
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ result = ExecReconsiderAsyncForeignScan((ForeignScanState *) node,
+ acxt);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(node));
+ result = false; /* keep compiler quiet */
+ break;
+ }
+
+ return result;
+}
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanRequest(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait. We expect the node-type specific callback to
+ * make a sigle call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanConfigureWait(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanNotify(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+static void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestor))
+ {
+ case T_AppendState:
+ ExecAsyncAppendResponse(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestor));
+ }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node. The node can call this from its ExecAsyncRequest callback
+ * if the requested tuple is available immediately.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+ areq->request_complete = true;
+ areq->result = result;
+}
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index c74ce36ffb..caf11dc4e0 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -48,6 +48,7 @@
#include "commands/matview.h"
#include "commands/trigger.h"
#include "executor/execdebug.h"
+#include "executor/nodeAppend.h"
#include "executor/nodeSubplan.h"
#include "foreign/fdwapi.h"
#include "jit/jit.h"
@@ -78,6 +79,8 @@ ExecutorCheckPerms_hook_type ExecutorCheckPerms_hook = NULL;
/* decls for local routines only used within this module */
static void InitPlan(QueryDesc *queryDesc, int eflags);
static void CheckValidRowMarkRel(Relation rel, RowMarkType markType);
+static void ExecBuildForeignScanHashTable(EState *estate);
+static void ExecReconsiderPlan(EState *estate);
static void ExecPostprocessPlan(EState *estate);
static void ExecEndPlan(PlanState *planstate, EState *estate);
static void ExecutePlan(EState *estate, PlanState *planstate,
@@ -886,6 +889,9 @@ InitPlan(QueryDesc *queryDesc, int eflags)
/* signal that this EState is not used for EPQ */
estate->es_epq_active = NULL;
+ if (plannedstmt->asyncPlan)
+ ExecBuildForeignScanHashTable(estate);
+
/*
* Initialize private state information for each SubPlan. We must do this
* before running ExecInitNode on the main query tree, since
@@ -924,6 +930,9 @@ InitPlan(QueryDesc *queryDesc, int eflags)
*/
planstate = ExecInitNode(plan, estate, eflags);
+ if (plannedstmt->asyncPlan)
+ ExecReconsiderPlan(estate);
+
/*
* Get the tuple descriptor describing the type of tuples to return.
*/
@@ -1321,6 +1330,35 @@ ExecGetTriggerResultRel(EState *estate, Oid relid)
return rInfo;
}
+static void
+ExecBuildForeignScanHashTable(EState *estate)
+{
+ HASHCTL ctl;
+
+ MemSet(&ctl, 0, sizeof(ctl));
+ ctl.keysize = sizeof(Oid);
+ ctl.entrysize = sizeof(ForeignScanHashEntry);
+ ctl.hcxt = CurrentMemoryContext;
+
+ estate->es_foreign_scan_hash =
+ hash_create("User mapping dependency table", 256,
+ &ctl,
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+}
+
+static void
+ExecReconsiderPlan(EState *estate)
+{
+ ListCell *lc;
+
+ foreach(lc, estate->es_asyncappends)
+ {
+ AppendState *appendstate = (AppendState *) lfirst(lc);
+
+ ExecReconsiderAsyncAppend(appendstate);
+ }
+}
+
/* ----------------------------------------------------------------
* ExecPostprocessPlan
*
diff --git a/src/backend/executor/execUtils.c b/src/backend/executor/execUtils.c
index c734283bfe..df7b9b591b 100644
--- a/src/backend/executor/execUtils.c
+++ b/src/backend/executor/execUtils.c
@@ -156,6 +156,9 @@ CreateExecutorState(void)
estate->es_use_parallel_mode = false;
+ estate->es_asyncappends = NIL;
+ estate->es_foreign_scan_hash = NULL;
+
estate->es_jit_flags = 0;
estate->es_jit = NULL;
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 15e4115bd6..3896d9fcd4 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -57,10 +57,13 @@
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
@@ -78,12 +81,18 @@ struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
+#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@@ -102,7 +111,10 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
+ Bitmapset *asyncplanids;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
@@ -119,6 +131,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_syncdone = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
@@ -191,12 +204,27 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ asyncplanids = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
+ /*
+ * Record async subplans. When executing EvalPlanQual, we process
+ * async subplans synchronously, so don't do this in that case.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ asyncplanids = bms_add_member(asyncplanids,
+ initNode->plan_node_id);
+ nasyncplans++;
+ }
+
/*
* Record the lowest appendplans index which is a valid partial plan.
*/
@@ -210,6 +238,11 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* Initialize async state */
+ appendstate->as_asyncplans = asyncplans;
+ appendstate->as_asyncplanids = asyncplanids;
+ appendstate->as_nasyncplans = nasyncplans;
+
/*
* Miscellaneous initialization
*/
@@ -219,6 +252,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
/* For parallel query, this will be overridden later. */
appendstate->choose_next_subplan = choose_next_subplan_locally;
+ /*
+ * Lastly, if there is at least one async subplan, add the Append node to
+ * estate->es_asyncappends so that we can re-examine it in
+ * ExecReconsiderPlan.
+ */
+ if (nasyncplans > 0)
+ estate->es_asyncappends = lappend(estate->es_asyncappends,
+ appendstate);
+
return appendstate;
}
@@ -232,31 +274,45 @@ static TupleTableSlot *
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ TupleTableSlot *result;
- if (node->as_whichplan < 0)
+ if (!node->as_syncdone && node->as_whichplan == INVALID_SUBPLAN_INDEX)
{
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ /* If there are any async subplans, begin execution of them */
+ if (node->as_nasyncplans > 0)
+ ExecAppendAsyncBegin(node);
+
/*
- * If no subplan has been chosen, we must choose one before
+ * If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
- !node->choose_next_subplan(node))
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
for (;;)
{
PlanState *subnode;
- TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
- * figure out which subplan we are currently processing
+ * try to get a tuple from any of the async subplans
+ */
+ if (!bms_is_empty(node->as_needrequest) ||
+ (node->as_syncdone && node->as_nasyncremain > 0))
+ {
+ if (ExecAppendAsyncGetNext(node, &result))
+ return result;
+ Assert(bms_is_empty(node->as_needrequest));
+ }
+
+ /*
+ * figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +332,16 @@ ExecAppend(PlanState *pstate)
return result;
}
- /* choose new subplan; if none, we're done */
- if (!node->choose_next_subplan(node))
+ /* wait or poll async events */
+ if (node->as_nasyncremain > 0)
+ {
+ Assert(!node->as_syncdone);
+ Assert(bms_is_empty(node->as_needrequest));
+ ExecAppendAsyncEventWait(node);
+ }
+
+ /* choose new sync subplan; if no sync/async subplans, we're done */
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
@@ -313,6 +377,7 @@ ExecEndAppend(AppendState *node)
void
ExecReScanAppend(AppendState *node)
{
+ int nasyncplans = node->as_nasyncplans;
int i;
/*
@@ -326,6 +391,11 @@ ExecReScanAppend(AppendState *node)
{
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
+ if (nasyncplans > 0)
+ {
+ bms_free(node->as_valid_asyncplans);
+ node->as_valid_asyncplans = NULL;
+ }
}
for (i = 0; i < node->as_nplans; i++)
@@ -347,8 +417,26 @@ ExecReScanAppend(AppendState *node)
ExecReScan(subnode);
}
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->as_needrequest);
+ node->as_needrequest = NULL;
+ }
+
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_syncdone = false;
}
/* ----------------------------------------------------------------
@@ -429,7 +517,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
- * Choose next subplan for a non-parallel-aware Append,
+ * Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
@@ -444,9 +532,9 @@ choose_next_subplan_locally(AppendState *node)
/*
* If first call then have the bms member function choose the first valid
- * subplan by initializing whichplan to -1. If there happen to be no
- * valid subplans then the bms member function will handle that by
- * returning a negative number which will allow us to exit returning a
+ * sync subplan by initializing whichplan to -1. If there happen to be
+ * no valid sync subplans then the bms member function will handle that
+ * by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
@@ -467,7 +555,10 @@ choose_next_subplan_locally(AppendState *node)
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
+ {
+ node->as_syncdone = true;
return false;
+ }
node->as_whichplan = nextplan;
@@ -709,3 +800,362 @@ mark_invalid_subplans_as_finished(AppendState *node)
node->as_pstate->pa_finished[i] = true;
}
}
+
+/* ----------------------------------------------------------------
+ * ExecReconsiderAsyncAppend
+ *
+ * Re-examine an async-aware Append node
+ * ----------------------------------------------------------------
+ */
+void
+ExecReconsiderAsyncAppend(AppendState *node)
+{
+ Bitmapset *asyncplans = bms_copy(node->as_asyncplans);
+ int nasyncplans;
+ AsyncRequest **asyncrequests;
+ AsyncContext acxt;
+ int i;
+
+ asyncrequests = (AsyncRequest **) palloc0(node->as_nplans *
+ sizeof(AsyncRequest *));
+
+ /* Re-examine each async subplan */
+ acxt.requestor = (PlanState *) node;
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ PlanState *subnode = node->appendplans[i];
+
+ acxt.request_index = i;
+ if (!ExecReconsiderAsyncCapablePlan(subnode, &acxt))
+ {
+ bms_del_member(node->as_asyncplans, i);
+ bms_del_member(node->as_asyncplanids,
+ subnode->plan->plan_node_id);
+ --node->as_nasyncplans;
+ }
+ else
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) node;
+ areq->requestee = subnode;
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ asyncrequests[i] = areq;
+ }
+ }
+ bms_free(asyncplans);
+
+ /* No need for further processing if there are no async subplans */
+ nasyncplans = node->as_nasyncplans;
+ if (nasyncplans == 0)
+ return;
+
+ /* Initialize remaining async state */
+ node->as_asyncrequests = asyncrequests;
+ node->as_asyncresults = (TupleTableSlot **)
+ palloc0(nasyncplans * sizeof(TupleTableSlot *));
+ node->as_needrequest = NULL;
+
+ classify_matching_subplans(node);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncBegin
+ *
+ * Begin execution of designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+ int i;
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->as_nasyncplans > 0);
+
+ if (node->as_valid_subplans == NULL)
+ {
+ Assert(node->as_valid_asyncplans == NULL);
+
+ node->as_valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+
+ classify_matching_subplans(node);
+ }
+
+ node->as_nasyncremain = 0;
+
+ /* Nothing to do if there are no valid async subplans. */
+ valid_asyncplans = node->as_valid_asyncplans;
+ if (valid_asyncplans == NULL)
+ return;
+
+ /* Make a request for each of the async subplans. */
+ i = -1;
+ while ((i = bms_next_member(valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+
+ ++node->as_nasyncremain;
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncGetNext
+ *
+ * Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+ *result = NULL;
+
+ /* Make new async requests. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ while (node->as_nasyncremain > 0)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll async events. */
+ ExecAppendAsyncEventWait(node);
+
+ /* Make new async requests. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ /* Break from loop if there is any sync node that is not complete */
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If all sync subplans are complete, we're totally done scanning the
+ * givne node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the sync subplans.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(node->as_nasyncremain == 0);
+ *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncRequest
+ *
+ * If there are any asynchronous subplans that need a new asynchronous
+ * request, make all of them.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /* Nothing to do if there are no async subplans needing a new request. */
+ if (bms_is_empty(node->as_needrequest))
+ return false;
+
+ /*
+ * If there are any asynchronously-generated results that have not yet
+ * been returned, we have nothing to do; just return one of them.
+ */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ /* Make a new request for each of the async subplans that need it. */
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return one of the asynchronously-generated results if any. */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor wait events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+ long timeout = node->as_syncdone ? -1 : 0;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* Nothing to do if there are no remaining async subplans. */
+ if (node->as_nasyncremain == 0)
+ return;
+
+ node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+ node->as_nasyncplans + 1);
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add a event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ Assert(areq->callback_pending);
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+ AppendState *node = (AppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /*
+ * The subplan for which the request was made would be pending for a
+ * callback.
+ */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ --node->as_nasyncremain;
+ return;
+ }
+
+ /* Save result so we can return it */
+ Assert(node->as_nasyncresults < node->as_nasyncplans);
+ node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->as_needrequest = bms_add_member(node->as_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's as_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's as_valid_asyncplans
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->as_valid_subplans))
+ return;
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ return;
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_copy(node->as_asyncplans);
+ valid_asyncplans = bms_int_members(valid_asyncplans,
+ node->as_valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ valid_asyncplans);
+
+ /* Save valid async subplans. */
+ node->as_valid_asyncplans = valid_asyncplans;
+}
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 0969e53c3a..c92a35b8a6 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -22,6 +22,7 @@
*/
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/executor.h"
#include "executor/nodeForeignscan.h"
#include "foreign/fdwapi.h"
@@ -222,6 +223,9 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
if (node->resultRelation > 0)
scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1];
+ /* Initialize the async_capable flag. */
+ scanstate->ss.ps.async_capable = ((Plan *) node)->async_capable;
+
/* Initialize any outer plan. */
if (outerPlan(node))
outerPlanState(scanstate) =
@@ -391,3 +395,73 @@ ExecShutdownForeignScan(ForeignScanState *node)
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecReconsiderAsyncForeignScan
+ *
+ * Re-examine a ForeignScan node that was considered async-capable
+ * at plan time.
+ * ----------------------------------------------------------------
+ */
+bool
+ExecReconsiderAsyncForeignScan(ForeignScanState *node, AsyncContext *acxt)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+ bool result = true;
+
+ if (fdwroutine->ReconsiderAsyncForeignScan)
+ {
+ result = fdwroutine->ReconsiderAsyncForeignScan(node, acxt);
+ if (!result)
+ node->ss.ps.async_capable = false;
+ }
+ return result;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanRequest
+ *
+ * Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncRequest != NULL);
+ fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanNotify
+ *
+ * Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncNotify != NULL);
+ fdwroutine->ForeignAsyncNotify(areq);
+}
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 65bbc18ecb..8d64772931 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -87,6 +87,7 @@ _copyPlannedStmt(const PlannedStmt *from)
COPY_SCALAR_FIELD(transientPlan);
COPY_SCALAR_FIELD(dependsOnRole);
COPY_SCALAR_FIELD(parallelModeNeeded);
+ COPY_SCALAR_FIELD(asyncPlan);
COPY_SCALAR_FIELD(jitFlags);
COPY_NODE_FIELD(planTree);
COPY_NODE_FIELD(rtable);
@@ -120,6 +121,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
+ COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
@@ -241,6 +243,7 @@ _copyAppend(const Append *from)
*/
COPY_BITMAPSET_FIELD(apprelids);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(nasyncplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index f5dcedf6e8..80a853d706 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -305,6 +305,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node)
WRITE_BOOL_FIELD(transientPlan);
WRITE_BOOL_FIELD(dependsOnRole);
WRITE_BOOL_FIELD(parallelModeNeeded);
+ WRITE_BOOL_FIELD(asyncPlan);
WRITE_INT_FIELD(jitFlags);
WRITE_NODE_FIELD(planTree);
WRITE_NODE_FIELD(rtable);
@@ -333,6 +334,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
+ WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
@@ -431,6 +433,7 @@ _outAppend(StringInfo str, const Append *node)
WRITE_BITMAPSET_FIELD(apprelids);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(nasyncplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
}
@@ -2221,6 +2224,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node)
WRITE_BOOL_FIELD(parallelModeOK);
WRITE_BOOL_FIELD(parallelModeNeeded);
WRITE_CHAR_FIELD(maxParallelHazard);
+ WRITE_BOOL_FIELD(asyncPlan);
}
static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 4388aae71d..5104d1a2b4 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1581,6 +1581,7 @@ _readPlannedStmt(void)
READ_BOOL_FIELD(transientPlan);
READ_BOOL_FIELD(dependsOnRole);
READ_BOOL_FIELD(parallelModeNeeded);
+ READ_BOOL_FIELD(asyncPlan);
READ_INT_FIELD(jitFlags);
READ_NODE_FIELD(planTree);
READ_NODE_FIELD(rtable);
@@ -1614,6 +1615,7 @@ ReadCommonPlan(Plan *local_node)
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
+ READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
@@ -1710,6 +1712,7 @@ _readAppend(void)
READ_BITMAPSET_FIELD(apprelids);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(nasyncplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index aab06c7d21..3b034a0326 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -147,6 +147,7 @@ bool enable_partitionwise_aggregate = false;
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
+bool enable_async_append = true;
typedef struct
{
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index 6c8305c977..b30f8255f2 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1066,6 +1067,30 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
return plan;
}
+/*
+ * is_async_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
+
/*
* create_append_plan
* Create an Append plan for 'best_path' and (recursively) plans
@@ -1083,6 +1108,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
ListCell *subpaths;
+ int nasyncplans = 0;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
int nodenumsortkeys = 0;
@@ -1090,6 +1116,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ bool consider_async = false;
/*
* The subpaths list could be empty, if every child was proven empty by
@@ -1153,6 +1180,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
}
+ /* If appropriate, consider async append */
+ consider_async = (enable_async_append && pathkeys == NIL &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/* Build the plan for each child */
foreach(subpaths, best_path->subpaths)
{
@@ -1220,6 +1252,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
subplans = lappend(subplans, subplan);
+
+ /* Check to see if subplan can be executed asynchronously */
+ if (consider_async && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ ++nasyncplans;
+ }
}
/*
@@ -1252,9 +1291,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
}
plan->appendplans = subplans;
+ plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
+ if (nasyncplans > 0)
+ root->glob->asyncPlan = true;
+
copy_generic_path_info(&plan->plan, (Path *) best_path);
/*
diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c
index adf68d8790..95e7601a31 100644
--- a/src/backend/optimizer/plan/planner.c
+++ b/src/backend/optimizer/plan/planner.c
@@ -312,6 +312,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
glob->lastPlanNodeId = 0;
glob->transientPlan = false;
glob->dependsOnRole = false;
+ glob->asyncPlan = false;
/*
* Assess whether it's feasible to use parallel mode for this query. We
@@ -513,6 +514,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions,
result->transientPlan = glob->transientPlan;
result->dependsOnRole = glob->dependsOnRole;
result->parallelModeNeeded = glob->parallelModeNeeded;
+ result->asyncPlan = glob->asyncPlan;
result->planTree = top_plan;
result->rtable = glob->finalrtable;
result->resultRelations = glob->resultRelations;
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index f75b52719d..58f8e0bbcf 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3999,6 +3999,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index eafdb1118e..507567aff3 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1111,6 +1111,16 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of async append plans."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_async_append,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index bd57e917e1..1306094865 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -370,6 +370,7 @@
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
+#enable_async_append = on
# - Planner Cost Constants -
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
index e69de29bb2..bce30417d7 100644
--- a/src/include/executor/execAsync.h
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,31 @@
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ * Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+typedef struct AsyncContext
+{
+ PlanState *requestor;
+ int request_index;
+} AsyncContext;
+
+extern bool ExecReconsiderAsyncCapablePlan(PlanState *node,
+ AsyncContext *acxt);
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+
+#endif /* EXECASYNC_H */
diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h
index cafd410a5d..8c7ebc2998 100644
--- a/src/include/executor/nodeAppend.h
+++ b/src/include/executor/nodeAppend.h
@@ -25,4 +25,7 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
+extern void ExecReconsiderAsyncAppend(AppendState *node);
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
#endif /* NODEAPPEND_H */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index 6ae7733e25..56c3809d2d 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -17,6 +17,8 @@
#include "access/parallel.h"
#include "nodes/execnodes.h"
+struct AsyncContext;
+
extern ForeignScanState *ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags);
extern void ExecEndForeignScan(ForeignScanState *node);
extern void ExecReScanForeignScan(ForeignScanState *node);
@@ -31,4 +33,10 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecReconsiderAsyncForeignScan(ForeignScanState *node,
+ struct AsyncContext *acxt);
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
#endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 248f78da45..99cabd6b94 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -19,6 +19,7 @@
/* To avoid including explain.h here, reference ExplainState thus: */
struct ExplainState;
+struct AsyncContext;
/*
* Callback function signatures --- see fdwhandler.sgml for more info.
@@ -178,6 +179,17 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef bool (*ReconsiderAsyncForeignScan_function) (ForeignScanState *node,
+ struct AsyncContext *acxt);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
@@ -256,6 +268,13 @@ typedef struct FdwRoutine
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ReconsiderAsyncForeignScan_function ReconsiderAsyncForeignScan;
+ ForeignAsyncRequest_function ForeignAsyncRequest;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+ ForeignAsyncNotify_function ForeignAsyncNotify;
} FdwRoutine;
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index b6a88ff76b..68584b3c14 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -512,6 +512,32 @@ typedef struct ResultRelInfo
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
} ResultRelInfo;
+/* ----------------
+ * AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+ struct PlanState *requestor; /* Node that wants a tuple */
+ struct PlanState *requestee; /* Node from which a tuple is wanted */
+ int request_index; /* Scratch space for requestor */
+ bool callback_pending; /* Callback is needed */
+ bool request_complete; /* Request complete, result valid */
+ TupleTableSlot *result; /* Result (NULL if no more tuples) */
+} AsyncRequest;
+
+/*
+ * Hash entry to store the set of IDs of ForeignScanStates that use the same
+ * user mapping
+ */
+typedef struct ForeignScanHashEntry
+{
+ Oid umid; /* hash key -- must be first */
+ Bitmapset *fsplanids;
+} ForeignScanHashEntry;
+
/* ----------------
* EState information
*
@@ -602,6 +628,14 @@ typedef struct EState
/* The per-query shared memory area to use for parallel execution. */
struct dsa_area *es_query_dsa;
+ List *es_asyncappends; /* List of async-aware AppendStates */
+
+ /*
+ * Hash table to store the set of IDs of ForeignScanStates using the same
+ * user mapping
+ */
+ HTAB *es_foreign_scan_hash;
+
/*
* JIT information. es_jit_flags indicates whether JIT should be performed
* and with which options. es_jit is created on-demand when JITing is
@@ -969,6 +1003,8 @@ typedef struct PlanState
*/
Bitmapset *chgParam; /* set of IDs of changed Params */
+ bool async_capable;
+
/*
* Other run-time state needed by most if not all node types.
*/
@@ -1217,12 +1253,24 @@ struct AppendState
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ bool as_syncdone; /* all synchronous plans done? */
+ Bitmapset *as_asyncplans; /* asynchronous plans indexes */
+ Bitmapset *as_asyncplanids; /* asynchronous plans IDs */
+ int as_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
+ int as_nasyncresults; /* # of valid entries in as_asyncresults */
+ int as_nasyncremain; /* # of remaining async plans */
+ Bitmapset *as_needrequest; /* async plans ready for a request */
+ struct WaitEventSet *as_eventset; /* WaitEventSet used to configure
+ * file descriptor wait events */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
+ Bitmapset *as_valid_asyncplans;
bool (*choose_next_subplan) (AppendState *);
};
diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h
index 0ec93e648c..e76db3eb4c 100644
--- a/src/include/nodes/pathnodes.h
+++ b/src/include/nodes/pathnodes.h
@@ -141,6 +141,8 @@ typedef struct PlannerGlobal
char maxParallelHazard; /* worst PROPARALLEL hazard level */
PartitionDirectory partition_directory; /* partition descriptors */
+
+ bool asyncPlan; /* does plan have async-aware Append? */
} PlannerGlobal;
/* macro for fetching the Plan associated with a SubPlan node */
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 43160439f0..c636b498ef 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -59,6 +59,8 @@ typedef struct PlannedStmt
bool parallelModeNeeded; /* parallel mode required to execute? */
+ bool asyncPlan; /* does plan have async-aware Append? */
+
int jitFlags; /* which forms of JIT should be performed */
struct Plan *planTree; /* tree of Plan nodes */
@@ -129,6 +131,11 @@ typedef struct Plan
bool parallel_aware; /* engage parallel-aware logic? */
bool parallel_safe; /* OK to use as part of parallel plan? */
+ /*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
/*
* Common structural data for all Plan types.
*/
@@ -245,6 +252,7 @@ typedef struct Append
Plan plan;
Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
List *appendplans;
+ int nasyncplans; /* # of asynchronous plans */
/*
* All 'appendplans' preceding this index are non-partial plans. All
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ed2e4af4be..c2952e375d 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
extern PGDLLIMPORT bool enable_parallel_append;
extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 724068cf87..d9588da38a 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -957,6 +957,7 @@ typedef enum
*/
typedef enum
{
+ WAIT_EVENT_APPEND_READY,
WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
diff --git a/src/test/regress/expected/explain.out b/src/test/regress/expected/explain.out
index dc7ab2ce8b..e78ca7bddb 100644
--- a/src/test/regress/expected/explain.out
+++ b/src/test/regress/expected/explain.out
@@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8
+
Seq Scan +
false +
+ false +
int8_tbl +
i8 +
N.N +
@@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
+ Async Capable: false +
Relation Name: "int8_tbl"+
Alias: "i8" +
Startup Cost: N.N +
@@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8'
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
@@ -348,6 +352,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Relation Name": "tenk1", +
"Parallel Aware": true, +
"Local Hit Blocks": 0, +
@@ -393,6 +398,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Sort Space Used": 0, +
"Local Hit Blocks": 0, +
@@ -435,6 +441,7 @@ select jsonb_pretty(
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Workers Planned": 0, +
"Local Hit Blocks": 0, +
diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out
index d574583844..406fb88130 100644
--- a/src/test/regress/expected/incremental_sort.out
+++ b/src/test/regress/expected/incremental_sort.out
@@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 55, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
@@ -745,6 +746,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
"Node Type": "Incremental Sort", +
"Actual Rows": 70, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
diff --git a/src/test/regress/expected/insert_conflict.out b/src/test/regress/expected/insert_conflict.out
index ff157ceb1c..499245068a 100644
--- a/src/test/regress/expected/insert_conflict.out
+++ b/src/test/regress/expected/insert_conflict.out
@@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
"Node Type": "ModifyTable", +
"Operation": "Insert", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "insertconflicttest", +
"Alias": "insertconflicttest", +
"Conflict Resolution": "UPDATE", +
@@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
{ +
"Node Type": "Result", +
"Parent Relationship": "Member", +
- "Parallel Aware": false +
+ "Parallel Aware": false, +
+ "Async Capable": false +
} +
] +
} +
diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out
index 81bdacf59d..b7818c0637 100644
--- a/src/test/regress/expected/sysviews.out
+++ b/src/test/regress/expected/sysviews.out
@@ -88,6 +88,7 @@ select count(*) = 1 as ok from pg_stat_wal;
select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
+ enable_async_append | on
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
@@ -106,7 +107,7 @@ select name, setting from pg_settings where name like 'enable%';
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(18 rows)
+(19 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail