From d3f0c64e85417e6fcf164656481ea80732b9bd87 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 23 Apr 2024 04:15:49 +0000 Subject: [PATCH v19 3/6] Optimize INSERT INTO ... SELECT with multi inserts --- contrib/test_decoding/expected/stream.out | 2 +- src/backend/executor/nodeModifyTable.c | 177 +++++++++++++++++++--- src/tools/pgindent/typedefs.list | 1 + 3 files changed, 161 insertions(+), 19 deletions(-) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 4ab2d47bf8..c19facb3c9 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -101,10 +101,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl streaming change for transaction streaming change for transaction streaming change for transaction - streaming change for transaction closing a streamed block for transaction opening a streamed block for transaction streaming change for transaction + streaming change for transaction closing a streamed block for transaction committing streamed transaction (17 rows) diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index cee60d3659..434e3f8411 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -114,6 +114,19 @@ typedef struct UpdateContext LockTupleMode lockmode; } UpdateContext; +typedef struct InsertModifyBufferFlushContext +{ + ResultRelInfo *resultRelInfo; + EState *estate; + ModifyTableState *mtstate; +} InsertModifyBufferFlushContext; + +static InsertModifyBufferFlushContext *insert_modify_buffer_flush_context = NULL; +static TableModifyState *table_modify_state = NULL; + +static void InsertModifyBufferFlushCallback(void *context, + TupleTableSlot **slots, + int nslots); static void ExecBatchInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, @@ -726,6 +739,61 @@ ExecGetUpdateNewTuple(ResultRelInfo *relinfo, return ExecProject(newProj); } +static void +InsertModifyBufferFlushCallback(void *context, TupleTableSlot **slots, int nslots) +{ + InsertModifyBufferFlushContext *ctx = (InsertModifyBufferFlushContext *) context; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + ModifyTableState *mtstate = ctx->mtstate; + int i; + + if (nslots <= 0) + return; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Caller must take care of opening and closing the indices */ + for (i = 0; i < nslots; i++) + { + /* + * If there are any indexes, update them for all the inserted tuples, + * and run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slots[i], estate, false, + false, NULL, NIL, false); + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], recheckIndexes, + mtstate->mt_transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT + * triggers anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + ExecARInsertTriggers(estate, resultRelInfo, + slots[i], NIL, + mtstate->mt_transition_capture); + } + } +} + /* ---------------------------------------------------------------- * ExecInsert * @@ -751,7 +819,8 @@ ExecInsert(ModifyTableContext *context, TupleTableSlot *slot, bool canSetTag, TupleTableSlot **inserted_tuple, - ResultRelInfo **insert_destrel) + ResultRelInfo **insert_destrel, + bool canMultiInsert) { ModifyTableState *mtstate = context->mtstate; EState *estate = context->estate; @@ -764,6 +833,7 @@ ExecInsert(ModifyTableContext *context, OnConflictAction onconflict = node->onConflictAction; PartitionTupleRouting *proute = mtstate->mt_partition_tuple_routing; MemoryContext oldContext; + bool ar_insert_triggers_executed = false; /* * If the input result relation is a partitioned table, find the leaf @@ -1126,17 +1196,53 @@ ExecInsert(ModifyTableContext *context, } else { - /* insert the tuple normally */ - table_tuple_insert(resultRelationDesc, slot, - estate->es_output_cid, - 0, NULL); + if (canMultiInsert && + proute == NULL && + resultRelInfo->ri_WithCheckOptions == NIL && + resultRelInfo->ri_projectReturning == NULL) + { + if (insert_modify_buffer_flush_context == NULL) + { + insert_modify_buffer_flush_context = + (InsertModifyBufferFlushContext *) palloc0(sizeof(InsertModifyBufferFlushContext)); + insert_modify_buffer_flush_context->resultRelInfo = resultRelInfo; + insert_modify_buffer_flush_context->estate = estate; + insert_modify_buffer_flush_context->mtstate = mtstate; + } - /* insert index entries for tuple */ - if (resultRelInfo->ri_NumIndices > 0) - recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - slot, estate, false, - false, NULL, NIL, - false); + if (table_modify_state == NULL) + { + table_modify_state = table_modify_begin(resultRelInfo->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS, + estate->es_output_cid, + 0, + InsertModifyBufferFlushCallback, + insert_modify_buffer_flush_context); + } + + table_modify_buffer_insert(table_modify_state, slot); + ar_insert_triggers_executed = true; + } + else + { + /* insert the tuple normally */ + table_tuple_insert(resultRelationDesc, slot, + estate->es_output_cid, + 0, NULL); + + /* insert index entries for tuple */ + if (resultRelInfo->ri_NumIndices > 0) + recheckIndexes = ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, + false); + + ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, + mtstate->mt_transition_capture); + + list_free(recheckIndexes); + ar_insert_triggers_executed = true; + } } } @@ -1170,10 +1276,12 @@ ExecInsert(ModifyTableContext *context, } /* AFTER ROW INSERT Triggers */ - ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, - ar_insert_trig_tcs); - - list_free(recheckIndexes); + if (!ar_insert_triggers_executed) + { + ExecARInsertTriggers(estate, resultRelInfo, slot, recheckIndexes, + ar_insert_trig_tcs); + list_free(recheckIndexes); + } /* * Check any WITH CHECK OPTION constraints from parent views. We are @@ -1869,7 +1977,7 @@ ExecCrossPartitionUpdate(ModifyTableContext *context, /* Tuple routing starts from the root table. */ context->cpUpdateReturningSlot = ExecInsert(context, mtstate->rootResultRelInfo, slot, canSetTag, - inserted_tuple, insert_destrel); + inserted_tuple, insert_destrel, false); /* * Reset the transition state that may possibly have been written by @@ -3364,7 +3472,7 @@ ExecMergeNotMatched(ModifyTableContext *context, ResultRelInfo *resultRelInfo, mtstate->mt_merge_action = action; rslot = ExecInsert(context, mtstate->rootResultRelInfo, - newslot, canSetTag, NULL, NULL); + newslot, canSetTag, NULL, NULL, false); mtstate->mt_merge_inserted += 1; break; case CMD_NOTHING: @@ -3749,6 +3857,10 @@ ExecModifyTable(PlanState *pstate) HeapTupleData oldtupdata; HeapTuple oldtuple; ItemPointer tupleid; + bool canMultiInsert = false; + + table_modify_state = NULL; + insert_modify_buffer_flush_context = NULL; CHECK_FOR_INTERRUPTS(); @@ -3844,6 +3956,10 @@ ExecModifyTable(PlanState *pstate) if (TupIsNull(context.planSlot)) break; + if (operation == CMD_INSERT && + nodeTag(subplanstate) == T_SeqScanState) + canMultiInsert = true; + /* * When there are multiple result relations, each tuple contains a * junk column that gives the OID of the rel from which it came. @@ -4057,7 +4173,7 @@ ExecModifyTable(PlanState *pstate) ExecInitInsertProjection(node, resultRelInfo); slot = ExecGetInsertNewTuple(resultRelInfo, context.planSlot); slot = ExecInsert(&context, resultRelInfo, slot, - node->canSetTag, NULL, NULL); + node->canSetTag, NULL, NULL, canMultiInsert); break; case CMD_UPDATE: @@ -4116,6 +4232,17 @@ ExecModifyTable(PlanState *pstate) return slot; } + if (table_modify_state != NULL) + { + Assert(operation == CMD_INSERT); + + table_modify_end(table_modify_state); + table_modify_state = NULL; + + pfree(insert_modify_buffer_flush_context); + insert_modify_buffer_flush_context = NULL; + } + /* * Insert remaining tuples for batch insert. */ @@ -4228,6 +4355,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) mtstate->mt_merge_updated = 0; mtstate->mt_merge_deleted = 0; + table_modify_state = NULL; + insert_modify_buffer_flush_context = NULL; + /*---------- * Resolve the target relation. This is the same as: * @@ -4681,6 +4811,17 @@ ExecEndModifyTable(ModifyTableState *node) { int i; + if (table_modify_state != NULL) + { + Assert(node->operation == CMD_INSERT); + + table_modify_end(table_modify_state); + table_modify_state = NULL; + + pfree(insert_modify_buffer_flush_context); + insert_modify_buffer_flush_context = NULL; + } + /* * Allow any FDWs to shut down */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index ebde07bcde..11c4d99430 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1226,6 +1226,7 @@ InjectionPointEntry InjectionPointSharedState InlineCodeBlock InProgressIO +InsertModifyBufferFlushContext InsertStmt Instrumentation Int128AggState -- 2.34.1