From 169131f28e09c41b0b100f953b54dd16b2e3185a Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 29 Jan 2024 11:02:37 +0000 Subject: [PATCH v10 4/4] Use new multi insert TAM for COPY FROM --- src/backend/commands/copyfrom.c | 92 ++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 1fe70b9133..8abf33aa97 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -77,10 +77,9 @@ /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { - TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */ + TableInsertState *ti_state; /* Table insert state; NULL if foreign table */ + TupleTableSlot **slots; /* Array to store tuples */ ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */ - BulkInsertState bistate; /* BulkInsertState for this rel if plain - * table; NULL if foreign table */ int nused; /* number of 'slots' containing tuples */ uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy * stream */ @@ -223,14 +222,31 @@ limit_printout_length(const char *str) * ResultRelInfo. */ static CopyMultiInsertBuffer * -CopyMultiInsertBufferInit(ResultRelInfo *rri) +CopyMultiInsertBufferInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri) { CopyMultiInsertBuffer *buffer; buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer)); - memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + + if (rri->ri_FdwRoutine == NULL) + { + int num_slots; + + buffer->ti_state = table_insert_begin(rri->ri_RelationDesc, + miinfo->mycid, + TABLEAM_MULTI_INSERTS | + TABLEAM_BULKWRITE_BUFFER_ACCESS_STRATEGY | + TABLEAM_SKIP_MULTI_INSERTS_FLUSH, + miinfo->ti_options); + buffer->slots = table_multi_insert_slots(buffer->ti_state, &num_slots); + } + else + { + buffer->slots = palloc0(sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES); + buffer->ti_state = NULL; + } + buffer->resultRelInfo = rri; - buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL; buffer->nused = 0; return buffer; @@ -245,7 +261,7 @@ CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer; - buffer = CopyMultiInsertBufferInit(rri); + buffer = CopyMultiInsertBufferInit(miinfo, rri); /* Setup back-link so we can easily find this buffer again */ rri->ri_CopyMultiInsertBuffer = buffer; @@ -322,8 +338,6 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, int batch_size = resultRelInfo->ri_BatchSize; int sent = 0; - Assert(buffer->bistate == NULL); - /* Ensure that the FDW supports batching and it's enabled */ Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert); Assert(batch_size > 1); @@ -395,13 +409,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, } else { - CommandId mycid = miinfo->mycid; - int ti_options = miinfo->ti_options; bool line_buf_valid = cstate->line_buf_valid; uint64 save_cur_lineno = cstate->cur_lineno; - MemoryContext oldcontext; - - Assert(buffer->bistate != NULL); /* * Print error context information correctly, if one of the operations @@ -409,18 +418,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, */ cstate->line_buf_valid = false; - /* - * table_multi_insert may leak memory, so switch to short-lived memory - * context before calling it. - */ - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - table_multi_insert(resultRelInfo->ri_RelationDesc, - slots, - nused, - mycid, - ti_options, - buffer->bistate); - MemoryContextSwitchTo(oldcontext); + table_multi_insert_flush(buffer->ti_state); for (i = 0; i < nused; i++) { @@ -435,7 +433,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo, cstate->cur_lineno = buffer->linenos[i]; recheckIndexes = ExecInsertIndexTuples(resultRelInfo, - buffer->slots[i], estate, false, + slots[i], estate, false, false, NULL, NIL, false); ExecARInsertTriggers(estate, resultRelInfo, slots[i], recheckIndexes, @@ -493,20 +491,15 @@ CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo, resultRelInfo->ri_CopyMultiInsertBuffer = NULL; if (resultRelInfo->ri_FdwRoutine == NULL) - { - Assert(buffer->bistate != NULL); - FreeBulkInsertState(buffer->bistate); - } + table_insert_end(buffer->ti_state); else - Assert(buffer->bistate == NULL); - - /* Since we only create slots on demand, just drop the non-null ones. */ - for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) - ExecDropSingleTupleTableSlot(buffer->slots[i]); + { + /* Since we only create slots on demand, just drop the non-null ones. */ + for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) + ExecDropSingleTupleTableSlot(buffer->slots[i]); - if (resultRelInfo->ri_FdwRoutine == NULL) - table_finish_bulk_insert(resultRelInfo->ri_RelationDesc, - miinfo->ti_options); + pfree(buffer->slots); + } pfree(buffer); } @@ -593,13 +586,25 @@ CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo, { CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer; int nused = buffer->nused; + TupleTableSlot *slot; Assert(buffer != NULL); Assert(nused < MAX_BUFFERED_TUPLES); - if (buffer->slots[nused] == NULL) - buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL); - return buffer->slots[nused]; + if (rri->ri_FdwRoutine == NULL) + slot = table_multi_insert_next_free_slot(buffer->ti_state); + else + { + if (buffer->slots[nused] == NULL) + { + slot = table_slot_create(rri->ri_RelationDesc, NULL); + buffer->slots[nused] = slot; + } + else + slot = buffer->slots[nused]; + } + + return slot; } /* @@ -615,6 +620,9 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, Assert(buffer != NULL); Assert(slot == buffer->slots[buffer->nused]); + if (rri->ri_FdwRoutine == NULL) + table_multi_insert_v2(buffer->ti_state, slot); + /* Store the line number so we can properly report any errors later */ buffer->linenos[buffer->nused] = lineno; -- 2.34.1