From 224378c11d270aabe28bdd32efacd37ed1984bd1 Mon Sep 17 00:00:00 2001 From: Daniil Davidov Date: Mon, 7 Apr 2025 12:55:50 +0700 Subject: [PATCH v1] Meet append optimized tables --- src/backend/access/common/reloptions.c | 11 + src/backend/access/heap/heapam.c | 205 ++++++++++++++++++ src/backend/access/heap/heapam_handler.c | 5 + src/backend/access/table/tableamapi.c | 5 + src/backend/commands/explain.c | 5 +- src/backend/executor/execExpr.c | 17 +- src/backend/executor/execProcnode.c | 9 + src/backend/executor/nodeModifyTable.c | 194 ++++++++++++++++- src/backend/optimizer/plan/createplan.c | 1 + src/backend/optimizer/util/clauses.c | 28 ++- src/include/access/heapam.h | 41 ++++ src/include/access/tableam.h | 84 +++++++ src/include/nodes/execnodes.h | 6 + src/include/nodes/plannodes.h | 2 + src/include/optimizer/optimizer.h | 3 + src/include/utils/rel.h | 10 + .../regress/expected/append_optimized.out | 161 ++++++++++++++ src/test/regress/parallel_schedule | 2 + src/test/regress/sql/append_optimized.sql | 105 +++++++++ 19 files changed, 879 insertions(+), 15 deletions(-) create mode 100644 src/test/regress/expected/append_optimized.out create mode 100644 src/test/regress/sql/append_optimized.sql diff --git a/src/backend/access/common/reloptions.c b/src/backend/access/common/reloptions.c index 46c1dce222d..9652cf4179b 100644 --- a/src/backend/access/common/reloptions.c +++ b/src/backend/access/common/reloptions.c @@ -166,6 +166,15 @@ static relopt_bool boolRelOpts[] = }, true }, + { + { + "append_optimized", + "Enables using batching for insertion algorithm whenever it possible", + RELOPT_KIND_HEAP, + AccessExclusiveLock + }, + false + }, /* list terminator */ {{NULL}} }; @@ -1905,6 +1914,8 @@ default_reloptions(Datum reloptions, bool validate, relopt_kind kind) offsetof(StdRdOptions, vacuum_index_cleanup)}, {"vacuum_truncate", RELOPT_TYPE_BOOL, offsetof(StdRdOptions, vacuum_truncate), offsetof(StdRdOptions, vacuum_truncate_set)}, + {"append_optimized", RELOPT_TYPE_BOOL, + offsetof(StdRdOptions, append_optimized)}, {"vacuum_max_eager_freeze_failure_rate", RELOPT_TYPE_REAL, offsetof(StdRdOptions, vacuum_max_eager_freeze_failure_rate)} }; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ed2e3021799..415eef4c35d 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -51,6 +51,7 @@ #include "utils/datum.h" #include "utils/injection_point.h" #include "utils/inval.h" +#include "utils/memutils.h" #include "utils/spccache.h" #include "utils/syscache.h" @@ -106,6 +107,7 @@ static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup); static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool key_required, bool *copy); +static void heap_modify_insert_end(TableModifyState *state); /* * Each tuple lock mode has a corresponding heavyweight lock, and one or two @@ -2674,6 +2676,209 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, pgstat_count_heap_insert(relation, ntuples); } +/* + * Initialize heap modify state. + */ +TableModifyState * +heap_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx) +{ + TableModifyState *state; + MemoryContext context; + MemoryContext oldcontext; + + Assert(RelationIsAppendOptimized(rel)); + context = AllocSetContextCreate(TopTransactionContext, + "heap_modify memory context", + ALLOCSET_DEFAULT_SIZES); + + oldcontext = MemoryContextSwitchTo(context); + state = palloc(sizeof(TableModifyState)); + state->rel = rel; + state->cid = cid; + state->options = options; + state->mem_ctx = context; + state->buffer_flush_cb = buffer_flush_cb; + state->buffer_flush_ctx = buffer_flush_ctx; + state->data = NULL; /* To be set lazily */ + MemoryContextSwitchTo(oldcontext); + + return state; +} + +/* + * Store passed-in tuple into in-memory buffered slots. When full, insert + * multiple tuples from the buffers into heap. + */ +void +heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + + Assert(RelationIsAppendOptimized(state->rel)); + oldcontext = MemoryContextSwitchTo(state->mem_ctx); + + /* First time through, initialize heap insert state */ + if (state->data == NULL) + { + istate = (HeapInsertState *) palloc(sizeof(HeapInsertState)); + istate->bistate = NULL; + istate->mistate = NULL; + state->data = istate; + mistate = + (HeapMultiInsertState *) palloc(sizeof(HeapMultiInsertState)); + mistate->slots = + (TupleTableSlot **) palloc0(sizeof(void *) * HEAP_MAX_BUFFERED_SLOTS); + mistate->tstore = tuplestore_begin_heap(false, false, work_mem); + mistate->nused = 0; + istate->mistate = mistate; + + /* + * heap_multi_insert() can leak memory. So switch to this memory + * context before every heap_multi_insert() call and reset when + * finished. + */ + mistate->mem_ctx = AllocSetContextCreate(CurrentMemoryContext, + "heap_multi_insert memory context", + ALLOCSET_DEFAULT_SIZES); + istate->bistate = GetBulkInsertState(); + } + + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + tuplestore_puttupleslot(mistate->tstore, slot); + mistate->nused += 1; + + if (mistate->nused >= HEAP_MAX_BUFFERED_SLOTS) + heap_modify_buffer_flush(state); + + MemoryContextSwitchTo(oldcontext); +} + +/* + * Insert multiple tuples from in-memory buffered slots into heap. + */ +void +heap_modify_buffer_flush(TableModifyState *state) +{ + HeapInsertState *istate; + HeapMultiInsertState *mistate; + MemoryContext oldcontext; + TupleDesc tupdesc; + + Assert(RelationIsAppendOptimized(state->rel)); + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + tupdesc = RelationGetDescr(state->rel); + istate = (HeapInsertState *) state->data; + Assert(istate->mistate != NULL); + mistate = istate->mistate; + + /* Quick exit if we have flushed already */ + if (mistate->nused == 0) + return; + + for (int i = 0; i < mistate->nused; i++) + { + bool ok; + + if (istate->mistate->slots[i] == NULL) + { + istate->mistate->slots[i] = + MakeSingleTupleTableSlot(tupdesc, &TTSOpsMinimalTuple); + } + ok = tuplestore_gettupleslot(mistate->tstore, true, false, + istate->mistate->slots[i]); + Assert(ok); + } + + /* + * heap_multi_insert() can leak memory, so switch to short-lived memory + * context before calling it. + */ + oldcontext = MemoryContextSwitchTo(mistate->mem_ctx); + heap_multi_insert(state->rel, + mistate->slots, + mistate->nused, + state->cid, + state->options, + istate->bistate); + MemoryContextSwitchTo(oldcontext); + MemoryContextReset(mistate->mem_ctx); + + /* + * Invoke caller-supplied buffer flush callback after inserting rows from + * the buffers to heap. + */ + if (state->buffer_flush_cb != NULL) + { + for (int i = 0; i < mistate->nused; i++) + { + state->buffer_flush_cb(state->buffer_flush_ctx, + mistate->slots[i]); + } + } + + tuplestore_clear(mistate->tstore); + mistate->nused = 0; +} + +/* + * Heap insert specific function used for performing work at the end like + * flushing remaining buffered tuples, cleaning up the insert state and tuple + * table slots used for buffered tuples etc. + */ +static void +heap_modify_insert_end(TableModifyState *state) +{ + HeapInsertState *istate; + + /* Quick exit if we haven't inserted anything yet */ + if (state->data == NULL) + return; + + istate = (HeapInsertState *) state->data; + + if (istate->mistate != NULL) + { + HeapMultiInsertState *mistate = istate->mistate; + + heap_modify_buffer_flush(state); + + Assert(mistate->nused == 0); + + for (int i = 0; i < HEAP_MAX_BUFFERED_SLOTS && mistate->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(mistate->slots[i]); + + tuplestore_end(mistate->tstore); + MemoryContextDelete(mistate->mem_ctx); + } + + if (istate->bistate != NULL) + FreeBulkInsertState(istate->bistate); +} + +/* + * Clean heap modify state. + */ +void +heap_modify_end(TableModifyState *state) +{ + heap_modify_insert_end(state); + MemoryContextDelete(state->mem_ctx); +} + /* * simple_heap_insert - insert a tuple * diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index ac082fefa77..56880165ed0 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2643,6 +2643,11 @@ static const TableAmRoutine heapam_methods = { .tuple_update = heapam_tuple_update, .tuple_lock = heapam_tuple_lock, + .tuple_modify_begin = heap_modify_begin, + .tuple_modify_buffer_insert = heap_modify_buffer_insert, + .tuple_modify_buffer_flush = heap_modify_buffer_flush, + .tuple_modify_end = heap_modify_end, + .tuple_fetch_row_version = heapam_fetch_row_version, .tuple_get_latest_tid = heap_get_latest_tid, .tuple_tid_valid = heapam_tuple_tid_valid, diff --git a/src/backend/access/table/tableamapi.c b/src/backend/access/table/tableamapi.c index 476663b66aa..ae30c5a21a8 100644 --- a/src/backend/access/table/tableamapi.c +++ b/src/backend/access/table/tableamapi.c @@ -94,6 +94,11 @@ GetTableAmRoutine(Oid amhandler) Assert(routine->scan_sample_next_block != NULL); Assert(routine->scan_sample_next_tuple != NULL); + Assert(routine->tuple_modify_begin != NULL); + Assert(routine->tuple_modify_buffer_insert != NULL); + Assert(routine->tuple_modify_buffer_flush != NULL); + Assert(routine->tuple_modify_end != NULL); + return routine; } diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index ef8aa489af8..31ce1fa7acb 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1399,7 +1399,10 @@ ExplainNode(PlanState *planstate, List *ancestors, switch (((ModifyTable *) plan)->operation) { case CMD_INSERT: - pname = operation = "Insert"; + if (((ModifyTable *) plan)->canUseBatching) + pname = operation = "MultiInsert"; + else + pname = operation = "Insert"; break; case CMD_UPDATE: pname = operation = "Update"; diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index f1569879b52..f2d3a236fbc 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -103,7 +103,11 @@ static void ExecInitJsonCoercion(ExprState *state, JsonReturning *returning, ErrorSaveContext *escontext, bool omit_quotes, bool exists_coerce, Datum *resv, bool *resnull); - +/* + * Every time when we find volatile function during expresstion evaluating, we + * must set this flag, so higher level code can process it appropriately. + */ +static bool volatile_func_flag = false; /* * ExecInitExpr: prepare an expression tree for execution @@ -264,6 +268,9 @@ ExecInitQual(List *qual, PlanState *parent) scratch.resvalue = &state->resvalue; scratch.resnull = &state->resnull; + /* Reset flag indicating the presence of volatile functions in qual */ + volatile_func_flag = false; + foreach_ptr(Expr, node, qual) { /* first evaluate expression */ @@ -276,6 +283,10 @@ ExecInitQual(List *qual, PlanState *parent) state->steps_len - 1); } + /* Possibly update information about batch-insert-capability */ + if (parent && !parent->has_volatile) + parent->has_volatile = volatile_func_flag; + /* adjust jump targets */ foreach_int(jump, adjust_jumps) { @@ -1193,6 +1204,10 @@ ExecInitExprRec(Expr *node, ExprState *state, { FuncExpr *func = (FuncExpr *) node; + /* Higher level code will handle it */ + if (func_volatile(func->funcid)) + volatile_func_flag = true; + ExecInitFunc(&scratch, node, func->args, func->funcid, func->inputcollid, state); diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f5f9cfbeead..2383ef7ea4b 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -416,6 +416,15 @@ ExecInitNode(Plan *node, EState *estate, int eflags) result->instrument = InstrAlloc(1, estate->es_instrument, result->async_capable); + /* Check whether some nodes below has volatile functions */ + if ((outerPlanState(result) != NULL && + outerPlanState(result)->has_volatile) || + (innerPlanState(result) != NULL && + innerPlanState(result)->has_volatile)) + { + result->has_volatile = true; + } + return result; } diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 309e27f8b5f..bbaf91bcbac 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -55,6 +55,7 @@ #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "catalog/pg_proc.h" #include "commands/trigger.h" #include "executor/execPartition.h" #include "executor/executor.h" @@ -67,6 +68,8 @@ #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/datum.h" +#include "utils/fmgroids.h" +#include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -130,6 +133,18 @@ typedef struct UpdateContext LockTupleMode lockmode; } UpdateContext; +typedef struct InsertModifyBufferFlushContext +{ + ResultRelInfo *resultRelInfo; + EState *estate; + ModifyTableState *mtstate; +} InsertModifyBufferFlushContext; + +static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL; +static TableModifyState *table_modify_state = NULL; + +static void InsertModifyBufferFlushCallback(void *context, + TupleTableSlot *slot); static void ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, @@ -174,6 +189,8 @@ static TupleTableSlot *ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, bool canSetTag); +static bool ContainVolatileFunctionsChecker(Oid func_id, void *context); +static bool IsMultiInsertCapable(ModifyTableState *mtstate); /* * Verify that the tuples to be produced by INSERT match the @@ -806,6 +823,31 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, return ExecProject(newProj); } +static void +InsertModifyBufferFlushCallback(void *context, TupleTableSlot *slot) +{ + InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + + /* Caller must take care of opening and closing the indices */ + + /* + * If there are any indexes, update them for all the inserted tuples, and + * run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, false); + list_free(recheckIndexes); + } +} + /* ---------------------------------------------------------------- * ExecInsert * @@ -1209,17 +1251,22 @@ ExecInsert(ModifyTableContext *context, } else { - /* insert the tuple normally */ - table_tuple_insert(resultRelationDesc, slot, - estate->es_output_cid, - 0, NULL); - - /* insert index entries for tuple */ - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, - false, NULL, NIL, - false); + if (table_modify_state != NULL) + table_modify_buffer_insert(table_modify_state, slot); + else + { + /* insert the tuple normally */ + table_tuple_insert(resultRelationDesc, slot, + estate->es_output_cid, + 0, NULL); + + /* insert index entries for tuple */ + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, + false); + } } } @@ -4586,6 +4633,13 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_mergeActionLists = mergeActionLists; mtstate->mt_mergeJoinConditions = mergeJoinConditions; + /* + * Previous ModifyTable node execution (if any) should have released + * these resources. + */ + Assert(insert_modify_buffer_flush_context == NULL && + table_modify_state == NULL); + /*---------- * Resolve the target relation. This is the same as: * @@ -4999,6 +5053,8 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) * * We only do this for INSERT, so that for UPDATE/DELETE the batch size * remains set to 0. + * + * Also determine whether we can use batching for this INSERT command. */ if (operation == CMD_INSERT) { @@ -5016,6 +5072,27 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) } else resultRelInfo->ri_BatchSize = 1; + + if (IsMultiInsertCapable(mtstate)) + { + insert_modify_buffer_flush_context = + (InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext)); + insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo; + insert_modify_buffer_flush_context->estate = estate; + insert_modify_buffer_flush_context->mtstate = mtstate; + + Assert(estate->es_output_cid != InvalidCommandId); + + table_modify_state = + table_modify_begin(resultRelInfo->ri_RelationDesc, + estate->es_output_cid, + 0, + InsertModifyBufferFlushCallback, + insert_modify_buffer_flush_context); + + /* For more accurate EXPLAIN output */ + node->canUseBatching = true; + } } /* @@ -5034,6 +5111,90 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) return mtstate; } +/* + * Returns true if batch insert can be performed in table whithin current query. + * We impose the following rules: + * 1) Batching is supported only for ordinary tables without ROW triggers + * and with append_optimized option set. + * 2) Batching is not supported for queries, containing RETURNING clause. + * 3) Batching is not supported for queries, containing any volatile + * functions in plan tree. + * 4) Batching is supported only for tables, that hasn't volatile default + * expressions. + */ +static bool +IsMultiInsertCapable(ModifyTableState *mtstate) +{ + ResultRelInfo *relinfo = mtstate->resultRelInfo; + TupleDesc tdesc = RelationGetDescr(relinfo->ri_RelationDesc); + bool has_row_triggers; + + Assert(mtstate->operation == CMD_INSERT); + + has_row_triggers = + (relinfo->ri_TrigDesc != NULL && + (relinfo->ri_TrigDesc->trig_insert_after_row || + relinfo->ri_TrigDesc->trig_insert_before_row || + relinfo->ri_TrigDesc->trig_insert_instead_row)); + + /* Check (1) - (3) conditions. */ + if (!RelationIsAppendOptimized(relinfo->ri_RelationDesc) || + relinfo->ri_projectReturning || + has_row_triggers) + { + return false; + } + + /* Check last condition. */ + + /* + * By default, this variable is calculated in the end of ExecInitNode + * processing, but we need it now. + */ + if ((outerPlanState(mtstate) != NULL && + outerPlanState(mtstate)->has_volatile) || + (innerPlanState(mtstate) != NULL && + innerPlanState(mtstate)->has_volatile)) + { + mtstate->ps.has_volatile = true; + return false; + } + + for (AttrNumber i = 0; i < tdesc->natts; i++) + { + Node *defexpr; + if (!TupleDescAttr(tdesc, i)->atthasdef) + continue; + + defexpr = TupleDescGetDefault(tdesc, i + 1); + if (contain_volatile_functions_extended(defexpr, + ContainVolatileFunctionsChecker)) + { + return false; + } + } + + /* All conditions are met - we can perform batch insert on table. */ + return true; +} + +/* + * Supportive function for IsMultiInsertCapable. + * + * To decide whether we can use batching, we should iterate across all default + * expressions in target table and check if they contain any volatile functions. + * + * But not all functions are considered dangerous in terms of batching. We can + * allow some volatile functions to appear in default expressions. For now, we + * only allow to use nextval (in order not to dismiss batching if target table + * has SERIAL filed). + */ +static bool ContainVolatileFunctionsChecker(Oid func_id, void *context) +{ + return (func_volatile(func_id) == PROVOLATILE_VOLATILE && + func_id != F_NEXTVAL); +} + /* ---------------------------------------------------------------- * ExecEndModifyTable * @@ -5047,6 +5208,17 @@ ExecEndModifyTable(ModifyTableState *node) { int i; + if (table_modify_state != NULL) + { + Assert(node->operation == CMD_INSERT); + + table_modify_end(table_modify_state); + table_modify_state = NULL; + + pfree(insert_modify_buffer_flush_context); + insert_modify_buffer_flush_context = NULL; + } + /* * Allow any FDWs to shut down */ diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index a8f22a8c154..7bf13de1e93 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -7133,6 +7133,7 @@ make_modifytable(PlannerInfo *root, Plan *subplan, node->operation = operation; node->canSetTag = canSetTag; + node->canUseBatching = false; node->nominalRelation = nominalRelation; node->rootRelation = rootRelation; node->partColsUpdated = partColsUpdated; diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 26a3e050086..91ee85e9157 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -157,6 +157,14 @@ static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); static bool pull_paramids_walker(Node *node, Bitmapset **context); +/* + * Allow user to supply specific checker for "contain_volatile_functions" call. + * In general it is not used, but for example append-optimized tables needs to + * ignore some types of volatile functions during default expressions check. + */ + +static bool contain_volatile_functions_checker(Oid func_id, void *context); +static check_function_callback checker = contain_volatile_functions_checker; /***************************************************************************** * Aggregate-function clause manipulation @@ -541,6 +549,23 @@ contain_volatile_functions(Node *clause) return contain_volatile_functions_walker(clause, NULL); } +/* + * Same as above, but allows to specify user-defined check_function_callback. + */ +bool +contain_volatile_functions_extended(Node *clause, + check_function_callback ud_checker) +{ + bool res; + check_function_callback prev_checker = checker; + + checker = ud_checker; + res = contain_volatile_functions_walker(clause, NULL); + checker = prev_checker; + + return res; +} + static bool contain_volatile_functions_checker(Oid func_id, void *context) { @@ -553,8 +578,7 @@ contain_volatile_functions_walker(Node *node, void *context) if (node == NULL) return false; /* Check for volatile functions in node itself */ - if (check_functions_in_node(node, contain_volatile_functions_checker, - context)) + if (check_functions_in_node(node, checker, context)) return true; if (IsA(node, NextValueExpr)) diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index e48fe434cd3..96b9e925e66 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -30,6 +30,7 @@ #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" +#include "utils/tuplestore.h" /* "options" flag bits for heap_insert */ @@ -270,6 +271,35 @@ typedef enum PRUNE_VACUUM_CLEANUP, /* VACUUM 2nd heap pass */ } PruneReason; +/* + * Maximum number of slots that multi-insert buffers can hold. + * + * Caution: Don't make this too big, as we could end up with this many tuples + * stored in multi insert buffer. + */ +#define HEAP_MAX_BUFFERED_SLOTS 1000 + +typedef struct HeapMultiInsertState +{ + /* Array of buffered slots */ + TupleTableSlot **slots; + + /* Holds the tuple set */ + Tuplestorestate *tstore; + + /* Number of buffered tuples currently held */ + int nused; + + /* Memory context for dealing with multi inserts */ + MemoryContext mem_ctx; +} HeapMultiInsertState; + +typedef struct HeapInsertState +{ + struct BulkInsertStateData *bistate; + HeapMultiInsertState *mistate; +} HeapInsertState; + /* ---------------- * function prototypes for heap access method * @@ -320,6 +350,17 @@ extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, extern void heap_multi_insert(Relation relation, struct TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); + +extern TableModifyState *heap_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx); +extern void heap_modify_buffer_insert(TableModifyState *state, + TupleTableSlot *slot); +extern void heap_modify_buffer_flush(TableModifyState *state); +extern void heap_modify_end(TableModifyState *state); + extern TM_Result heap_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot crosscheck, bool wait, struct TM_FailureData *tmfd, bool changingPart); diff --git a/src/include/access/tableam.h b/src/include/access/tableam.h index 8713e12cbfb..3942463b715 100644 --- a/src/include/access/tableam.h +++ b/src/include/access/tableam.h @@ -248,12 +248,44 @@ typedef struct TM_IndexDeleteOp TM_IndexStatus *status; } TM_IndexDeleteOp; +struct TableModifyState; + +/* Callback invoked upon flushing each buffered tuple */ +typedef void (*TableModifyBufferFlushCb) (void *context, + TupleTableSlot *slot); + +/* Holds table modify state */ +typedef struct TableModifyState +{ + /* These fields are used for inserts for now */ + + Relation rel; /* Relation to insert to */ + CommandId cid; /* Command ID for insert */ + int options; /* TABLE_INSERT options */ + + /* Memory context for dealing with modify state variables */ + MemoryContext mem_ctx; + + /* Flush callback and its context used for multi inserts */ + TableModifyBufferFlushCb buffer_flush_cb; + void *buffer_flush_ctx; + + /* Table AM specific data */ + void *data; +} TableModifyState; + /* "options" flag bits for table_tuple_insert */ /* TABLE_INSERT_SKIP_WAL was 0x0001; RelationNeedsWAL() now governs */ #define TABLE_INSERT_SKIP_FSM 0x0002 #define TABLE_INSERT_FROZEN 0x0004 #define TABLE_INSERT_NO_LOGICAL 0x0008 +/* + * Use BAS_BULKWRITE buffer access strategy. 0x0010 is for + * HEAP_INSERT_SPECULATIVE. + */ +#define TABLE_INSERT_BAS_BULKWRITE 0x0020 + /* flag bits for table_tuple_lock */ /* Follow tuples whose update is in progress if lock modes don't conflict */ #define TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS (1 << 0) @@ -571,6 +603,21 @@ typedef struct TableAmRoutine void (*finish_bulk_insert) (Relation rel, int options); + /* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ + TableModifyState *(*tuple_modify_begin) (Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx); + void (*tuple_modify_buffer_insert) (TableModifyState *state, + TupleTableSlot *slot); + void (*tuple_modify_buffer_flush) (TableModifyState *state); + void (*tuple_modify_end) (TableModifyState *state); + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ @@ -1560,6 +1607,43 @@ table_finish_bulk_insert(Relation rel, int options) } +/* ------------------------------------------------------------------------ + * Table Modify related functions. + * ------------------------------------------------------------------------ + */ +static inline TableModifyState * +table_modify_begin(Relation rel, + CommandId cid, + int options, + TableModifyBufferFlushCb buffer_flush_cb, + void *buffer_flush_ctx) +{ + return rel->rd_tableam->tuple_modify_begin(rel, + cid, + options, + buffer_flush_cb, + buffer_flush_ctx); +} + +static inline void +table_modify_buffer_insert(TableModifyState *state, TupleTableSlot *slot) +{ + state->rel->rd_tableam->tuple_modify_buffer_insert(state, slot); +} + +static inline void +table_modify_buffer_flush(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_buffer_flush(state); +} + +static inline void +table_modify_end(TableModifyState *state) +{ + state->rel->rd_tableam->tuple_modify_end(state); +} + + /* ------------------------------------------------------------------------ * DDL related functionality. * ------------------------------------------------------------------------ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5b6cadb5a6c..cbd798187eb 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1200,6 +1200,12 @@ typedef struct PlanState bool async_capable; /* true if node is async-capable */ + /* + * Qual of current node or any qual of nodes lower down the plan tree has + * at least one volatile function. + */ + bool has_volatile; + /* * Scanslot's descriptor if known. This is a bit of a hack, but otherwise * it's hard for expression compilation to optimize based on the diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 658d76225e4..3a38040d991 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -292,6 +292,8 @@ typedef struct ModifyTable CmdType operation; /* do we set the command tag/es_processed? */ bool canSetTag; + /* do we use batching during INSERT? */ + bool canUseBatching; /* Parent RT index for use of EXPLAIN */ Index nominalRelation; /* Root RT index, if partitioned/inherited */ diff --git a/src/include/optimizer/optimizer.h b/src/include/optimizer/optimizer.h index 546828b54bd..9bda34d21bc 100644 --- a/src/include/optimizer/optimizer.h +++ b/src/include/optimizer/optimizer.h @@ -22,6 +22,7 @@ #ifndef OPTIMIZER_H #define OPTIMIZER_H +#include "nodes/nodeFuncs.h" #include "nodes/parsenodes.h" /* @@ -142,6 +143,8 @@ extern Expr *canonicalize_qual(Expr *qual, bool is_check); extern bool contain_mutable_functions(Node *clause); extern bool contain_mutable_functions_after_planning(Expr *expr); extern bool contain_volatile_functions(Node *clause); +extern bool contain_volatile_functions_extended(Node *clause, + check_function_callback ud_checker); extern bool contain_volatile_functions_after_planning(Expr *expr); extern bool contain_volatile_functions_not_nextval(Node *clause); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b552359915f..e548954d81d 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -348,6 +348,7 @@ typedef struct StdRdOptions StdRdOptIndexCleanup vacuum_index_cleanup; /* controls index vacuuming */ bool vacuum_truncate; /* enables vacuum to truncate a relation */ bool vacuum_truncate_set; /* whether vacuum_truncate is set */ + bool append_optimized; /* use optimized insertion algorithm */ /* * Fraction of pages in a relation that vacuum can eagerly scan and fail @@ -367,6 +368,15 @@ typedef struct StdRdOptions ((relation)->rd_options ? \ ((StdRdOptions *) (relation)->rd_options)->toast_tuple_target : (defaulttarg)) +/* + * RelationIsAppendOptimized + * Check whether relation can use batching for insertion + */ + #define RelationIsAppendOptimized(relation) \ + (AssertMacro(RelationIsValid(relation)), \ + (relation)->rd_options ? \ + ((StdRdOptions *) (relation)->rd_options)->append_optimized : false) + /* * RelationGetFillFactor * Returns the relation's fillfactor. Note multiple eval of argument! diff --git a/src/test/regress/expected/append_optimized.out b/src/test/regress/expected/append_optimized.out new file mode 100644 index 00000000000..57b45a20e61 --- /dev/null +++ b/src/test/regress/expected/append_optimized.out @@ -0,0 +1,161 @@ +-- Not all INSERT queries are suitable for using batching. All conditions are +-- listed in nodeModifyTable.c +-- In this test we want to check whether append_optimized table correcly +-- determines when to use batching. +CREATE TABLE optimized_tbl ( + int_data INT DEFAULT random() +) WITH (append_optimized=true); +CREATE TABLE rows_source (int_data INT); +INSERT INTO rows_source SELECT generate_series(1, 10); +-- Must not use batching here, because optimized_tbl has volatile function +-- whithin default expression. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +--------------------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..35.50 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4) +(2 rows) + +-- Now default expression not prevent us from using batching. +ALTER TABLE optimized_tbl ALTER COLUMN int_data SET DEFAULT 0; +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +--------------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..35.50 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4) +(2 rows) + +-- Must not use batching here, because WHERE clause contains volatile function. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > random(); + QUERY PLAN +-------------------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..54.63 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..54.63 rows=850 width=4) + Filter: ((int_data)::double precision > random()) +(3 rows) + +-- Now WHERE clause not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > 2; + QUERY PLAN +-------------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..41.88 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..41.88 rows=850 width=4) + Filter: (int_data > 2) +(3 rows) + +-- Create ROW trigger on optimized_tbl. +CREATE OR REPLACE FUNCTION my_trigger_function() +RETURNS TRIGGER AS $$ +BEGIN + NEW.int_data := NEW.int_data * 10; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +CREATE TRIGGER my_row_trigger +BEFORE INSERT ON optimized_tbl +FOR EACH ROW +EXECUTE FUNCTION my_trigger_function(); +-- Must not use batching here, because optimized_tbl has ROW trigger. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +--------------------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..35.50 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..35.50 rows=2550 width=4) +(2 rows) + +DROP TRIGGER my_row_trigger ON optimized_tbl; +DROP FUNCTION my_trigger_function(); +-- Must not use batching here, because RETURNING clause is specified. +EXPLAIN INSERT INTO optimized_tbl VALUES (100) RETURNING int_data; + QUERY PLAN +----------------------------------------------------------- + Insert on optimized_tbl (cost=0.00..0.01 rows=1 width=4) + -> Result (cost=0.00..0.01 rows=1 width=4) +(2 rows) + +-- Now RETURNING not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl VALUES (100); + QUERY PLAN +---------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..0.01 rows=0 width=0) + -> Result (cost=0.00..0.01 rows=1 width=4) +(2 rows) + +TRUNCATE optimized_tbl; +CREATE INDEX idx_test_int_data ON optimized_tbl (int_data); +-- Fill source table with more data, so there will be several buffers flushs +-- during INSERT opration. +INSERT INTO rows_source SELECT generate_series(11, 10000); +-- It is OK to use batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + QUERY PLAN +----------------------------------------------------------------------- + MultiInsert on optimized_tbl (cost=0.00..159.75 rows=0 width=0) + -> Seq Scan on rows_source (cost=0.00..159.75 rows=11475 width=4) +(2 rows) + +INSERT INTO optimized_tbl +SELECT int_data FROM rows_source; +-- Check whether both index and table contains all inserted rows. +SELECT COUNT(*) FROM optimized_tbl; + count +------- + 10000 +(1 row) + +ANALYZE optimized_tbl; +SELECT c.relname, c.reltuples +FROM pg_class c +JOIN pg_index i ON c.oid = i.indexrelid +WHERE i.indrelid = 'optimized_tbl'::regclass; + relname | reltuples +-------------------+----------- + idx_test_int_data | 10000 +(1 row) + +-- We allow to use SERIAL field in append_optimized table. Check whether such +-- fields behave correctly. +CREATE TABLE test_serial( + id SERIAL, + int_data INT +) WITH (append_optimized=true); +CREATE TABLE small_source(int_data INT); +INSERT INTO small_source SELECT generate_series(1, 10); +EXPLAIN INSERT INTO test_serial(int_data) + SELECT int_data FROM small_source; + QUERY PLAN +---------------------------------------------------------------------- + MultiInsert on test_serial (cost=0.00..48.25 rows=0 width=0) + -> Seq Scan on small_source (cost=0.00..48.25 rows=2550 width=8) +(2 rows) + +INSERT INTO test_serial(int_data) +SELECT int_data FROM small_source; +SELECT * FROM test_serial; + id | int_data +----+---------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 + 6 | 6 + 7 | 7 + 8 | 8 + 9 | 9 + 10 | 10 +(10 rows) + +-- Cleanup +DROP TABLE optimized_tbl; +DROP TABLE rows_source; +DROP TABLE test_serial; +DROP TABLE small_source; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 0a35f2f8f6a..0cda71a358d 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -136,3 +136,5 @@ test: fast_default # run tablespace test at the end because it drops the tablespace created during # setup that other tests may use. test: tablespace + +test: append_optimized diff --git a/src/test/regress/sql/append_optimized.sql b/src/test/regress/sql/append_optimized.sql new file mode 100644 index 00000000000..ce3ffab2d52 --- /dev/null +++ b/src/test/regress/sql/append_optimized.sql @@ -0,0 +1,105 @@ +-- Not all INSERT queries are suitable for using batching. All conditions are +-- listed in nodeModifyTable.c +-- In this test we want to check whether append_optimized table correcly +-- determines when to use batching. + +CREATE TABLE optimized_tbl ( + int_data INT DEFAULT random() +) WITH (append_optimized=true); + +CREATE TABLE rows_source (int_data INT); +INSERT INTO rows_source SELECT generate_series(1, 10); + +-- Must not use batching here, because optimized_tbl has volatile function +-- whithin default expression. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +-- Now default expression not prevent us from using batching. +ALTER TABLE optimized_tbl ALTER COLUMN int_data SET DEFAULT 0; +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +-- Must not use batching here, because WHERE clause contains volatile function. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > random(); + +-- Now WHERE clause not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source + WHERE int_data > 2; + +-- Create ROW trigger on optimized_tbl. +CREATE OR REPLACE FUNCTION my_trigger_function() +RETURNS TRIGGER AS $$ +BEGIN + NEW.int_data := NEW.int_data * 10; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER my_row_trigger +BEFORE INSERT ON optimized_tbl +FOR EACH ROW +EXECUTE FUNCTION my_trigger_function(); + +-- Must not use batching here, because optimized_tbl has ROW trigger. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +DROP TRIGGER my_row_trigger ON optimized_tbl; +DROP FUNCTION my_trigger_function(); + +-- Must not use batching here, because RETURNING clause is specified. +EXPLAIN INSERT INTO optimized_tbl VALUES (100) RETURNING int_data; + +-- Now RETURNING not prevent us from using batching. +EXPLAIN INSERT INTO optimized_tbl VALUES (100); + +TRUNCATE optimized_tbl; +CREATE INDEX idx_test_int_data ON optimized_tbl (int_data); + +-- Fill source table with more data, so there will be several buffers flushs +-- during INSERT opration. +INSERT INTO rows_source SELECT generate_series(11, 10000); + +-- It is OK to use batching. +EXPLAIN INSERT INTO optimized_tbl + SELECT int_data FROM rows_source; + +INSERT INTO optimized_tbl +SELECT int_data FROM rows_source; + +-- Check whether both index and table contains all inserted rows. +SELECT COUNT(*) FROM optimized_tbl; +ANALYZE optimized_tbl; + +SELECT c.relname, c.reltuples +FROM pg_class c +JOIN pg_index i ON c.oid = i.indexrelid +WHERE i.indrelid = 'optimized_tbl'::regclass; + +-- We allow to use SERIAL field in append_optimized table. Check whether such +-- fields behave correctly. +CREATE TABLE test_serial( + id SERIAL, + int_data INT +) WITH (append_optimized=true); + +CREATE TABLE small_source(int_data INT); +INSERT INTO small_source SELECT generate_series(1, 10); + +EXPLAIN INSERT INTO test_serial(int_data) + SELECT int_data FROM small_source; + +INSERT INTO test_serial(int_data) +SELECT int_data FROM small_source; + +SELECT * FROM test_serial; + +-- Cleanup +DROP TABLE optimized_tbl; +DROP TABLE rows_source; +DROP TABLE test_serial; +DROP TABLE small_source; -- 2.43.0