From 684d846c53dd47e8b1654d8a58e8cca0194bdf12 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 29 Apr 2024 05:35:58 +0000 Subject: [PATCH v20 5/5] Use new multi insert Table AM for COPY FROM --- contrib/test_decoding/expected/stream.out | 2 +- src/backend/commands/copyfrom.c | 235 ++++++++++++++-------- src/include/commands/copyfrom_internal.h | 4 +- src/tools/pgindent/typedefs.list | 1 + 4 files changed, 160 insertions(+), 82 deletions(-) diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out index 4ab2d47bf8..c19facb3c9 100644 --- a/contrib/test_decoding/expected/stream.out +++ b/contrib/test_decoding/expected/stream.out @@ -101,10 +101,10 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'incl streaming change for transaction streaming change for transaction streaming change for transaction - streaming change for transaction closing a streamed block for transaction opening a streamed block for transaction streaming change for transaction + streaming change for transaction closing a streamed block for transaction committing streamed transaction (17 rows) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index ce4d62e707..403adfe481 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -71,14 +71,25 @@ /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +typedef struct CopyModifyBufferFlushContext +{ + CopyFromState cstate; + ResultRelInfo *resultRelInfo; + EState *estate; +} CopyModifyBufferFlushContext; + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableModifyState *mstate; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ + TupleTableSlot *multislot; + CopyModifyBufferFlushContext *modify_buffer_flush_context; int nused; /* number of 'slots' containing tuples */ + int currslotno; /* Current buffered slot number that's being + * flushed; Used to get correct cur_lineno for + * errors while in flush callback. */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ } CopyMultiInsertBuffer; @@ -99,6 +110,7 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; +static void CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot); /* non-export function prototypes */ static void ClosePipeFromProgram(CopyFromState cstate); @@ -218,14 +230,38 @@ CopyLimitPrintoutLength(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, + CopyFromState cstate, EState *estate) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + buffer->modify_buffer_flush_context = (CopyModifyBufferFlushContext *) palloc(sizeof(CopyModifyBufferFlushContext)); + buffer->modify_buffer_flush_context->cstate = cstate; + buffer->modify_buffer_flush_context->resultRelInfo = rri; + buffer->modify_buffer_flush_context->estate = estate; + + buffer->mstate = table_modify_begin(rri->ri_RelationDesc, + TM_FLAG_MULTI_INSERTS | + TM_FLAG_BAS_BULKWRITE, + miinfo->mycid, + miinfo->ti_options, + CopyModifyBufferFlushCallback, + buffer->modify_buffer_flush_context); + buffer->slots = NULL; + buffer->multislot = NULL; + } + else + { + buffer->mstate = NULL; + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->multislot = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -236,11 +272,12 @@ CopyMultiInsertBufferInit(ResultRelInfo *rri) */ static inline void CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, - ResultRelInfo *rri) + ResultRelInfo *rri, CopyFromState cstate, + EState *estate) { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri, cstate, estate); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -273,7 +310,7 @@ CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * tuples their way for the first time. */ if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) - CopyMultiInsertInfoSetupBuffer(miinfo, rri); + CopyMultiInsertInfoSetupBuffer(miinfo, rri, cstate, estate); } /* @@ -317,8 +354,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -390,13 +425,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -404,56 +434,18 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + Assert(buffer->currslotno <= buffer->nused); + buffer->currslotno = 0; - for (i = 0; i < nused; i++) - { - /* - * If there are any indexes, update them for all the inserted - * tuples, and run AFTER ROW INSERT triggers. - */ - if (resultRelInfo->ri_NumIndices > 0) - { - List *recheckIndexes; - - cstate->cur_lineno = buffer->linenos[i]; - recheckIndexes = - ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, - false, NULL, NIL, false); - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], recheckIndexes, - cstate->transition_capture); - list_free(recheckIndexes); - } + table_modify_buffer_flush(buffer->mstate); - /* - * There's no indexes, but see if we need to run AFTER ROW INSERT - * triggers anyway. - */ - else if (resultRelInfo->ri_TrigDesc != NULL && - (resultRelInfo->ri_TrigDesc->trig_insert_after_row || - resultRelInfo->ri_TrigDesc->trig_insert_new_table)) - { - cstate->cur_lineno = buffer->linenos[i]; - ExecARInsertTriggers(estate, resultRelInfo, - slots[i], NIL, - cstate->transition_capture); - } + Assert(buffer->currslotno <= buffer->nused); + buffer->currslotno = 0; - ExecClearTuple(slots[i]); - } + /* + * Indexes are updated and AFTER ROW INSERT triggers (if any) are run + * in the flush callback CopyModifyBufferFlushCallback. + */ /* Update the row counter and progress of the COPY command */ *processed += nused; @@ -469,6 +461,60 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, buffer->nused = 0; } +static void +CopyModifyBufferFlushCallback(void *context, TupleTableSlot *slot) +{ + CopyModifyBufferFlushContext *ctx = (CopyModifyBufferFlushContext *) context; + CopyFromState cstate = ctx->cstate; + ResultRelInfo *resultRelInfo = ctx->resultRelInfo; + EState *estate = ctx->estate; + CopyMultiInsertBuffer *buffer = resultRelInfo->ri_CopyMultiInsertBuffer; + + /* Quick exit if no indexes or no triggers */ + if (!(resultRelInfo->ri_NumIndices > 0 || + (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)))) + return; + + /* Caller must take care of opening and closing the indices */ + + /* + * If there are any indexes, update them for all the inserted tuples, and + * run AFTER ROW INSERT triggers. + */ + if (resultRelInfo->ri_NumIndices > 0) + { + List *recheckIndexes; + + cstate->cur_lineno = buffer->linenos[buffer->currslotno++]; + recheckIndexes = + ExecInsertIndexTuples(resultRelInfo, + slot, estate, false, + false, NULL, NIL, false); + ExecARInsertTriggers(estate, resultRelInfo, + slot, recheckIndexes, + cstate->transition_capture); + list_free(recheckIndexes); + } + + /* + * There's no indexes, but see if we need to run AFTER ROW INSERT triggers + * anyway. + */ + else if (resultRelInfo->ri_TrigDesc != NULL && + (resultRelInfo->ri_TrigDesc->trig_insert_after_row || + resultRelInfo->ri_TrigDesc->trig_insert_new_table)) + { + cstate->cur_lineno = buffer->linenos[buffer->currslotno++]; + ExecARInsertTriggers(estate, resultRelInfo, + slot, NIL, + cstate->transition_capture); + } + + Assert(buffer->currslotno <= buffer->nused); +} + /* * Drop used slots and free member for this buffer. * @@ -489,19 +535,18 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, if (resultRelInfo->ri_FdwRoutine == NULL) { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); + table_modify_end(buffer->mstate); + ExecDropSingleTupleTableSlot(buffer->multislot); + pfree(buffer->modify_buffer_flush_context); } else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -588,13 +633,32 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + { + if (buffer->multislot == NULL) + buffer->multislot = MakeTupleTableSlot(RelationGetDescr(rri->ri_RelationDesc), + &TTSOpsVirtual); + + /* Caller must clear the slot */ + slot = buffer->multislot; + } + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -608,7 +672,11 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; Assert(buffer != NULL); - Assert(slot == buffer->slots[buffer->nused]); + +#ifdef USE_ASSERT_CHECKING + if (rri->ri_FdwRoutine != NULL) + Assert(slot == buffer->slots[buffer->nused]); +#endif /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; @@ -616,6 +684,14 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, /* Record this slot as being used */ buffer->nused++; + if (rri->ri_FdwRoutine == NULL) + { + Assert(slot == buffer->multislot); + buffer->currslotno = 0; + + table_modify_buffer_insert(buffer->mstate, slot); + } + /* Update how many tuples are stored and their size */ miinfo->bufferedTuples++; miinfo->bufferedBytes += tuplen; @@ -830,7 +906,7 @@ CopyFrom(CopyFromState cstate) /* * It's generally more efficient to prepare a bunch of tuples for * insertion, and insert them in one - * table_multi_insert()/ExecForeignBatchInsert() call, than call + * table_modify_buffer_insert()/ExecForeignBatchInsert() call, than call * table_tuple_insert()/ExecForeignInsert() separately for every tuple. * However, there are a number of reasons why we might not be able to do * this. These are explained below. @@ -1080,7 +1156,8 @@ CopyFrom(CopyFromState cstate) { if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL) CopyMultiInsertInfoSetupBuffer(&multiInsertInfo, - resultRelInfo); + resultRelInfo, cstate, + estate); } else if (insertMethod == CIM_MULTI_CONDITIONAL && !CopyMultiInsertInfoIsEmpty(&multiInsertInfo)) diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index cad52fcc78..14addbc6f6 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -46,9 +46,9 @@ typedef enum EolType typedef enum CopyInsertMethod { CIM_SINGLE, /* use table_tuple_insert or ExecForeignInsert */ - CIM_MULTI, /* always use table_multi_insert or + CIM_MULTI, /* always use table_modify_buffer_insert or * ExecForeignBatchInsert */ - CIM_MULTI_CONDITIONAL, /* use table_multi_insert or + CIM_MULTI_CONDITIONAL, /* use table_modify_buffer_insert or * ExecForeignBatchInsert only if valid */ } CopyInsertMethod; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9582503bb4..0f0ad30188 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -486,6 +486,7 @@ CopyHeaderChoice CopyInsertMethod CopyMethod CopyLogVerbosityChoice +CopyModifyBufferFlushContext CopyMultiInsertBuffer CopyMultiInsertInfo CopyOnErrorChoice -- 2.34.1